feat(mpc-system): add event sourcing for session tracking

- Add SessionEventRepository interface for append-only event storage
- Implement PostgreSQL session_event_repo with immutable event log
- Add database migration for session_events table with indexes
- Record events for keygen and sign session creation
- Record events for signing-config APIs (set, update, clear)
- Wire up sessionEventRepo in main.go and account handler
- Update API documentation with event sourcing design

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-05 23:31:04 -08:00
parent aa74e2b2e2
commit 54061b4c16
7 changed files with 1093 additions and 63 deletions

View File

@ -0,0 +1,464 @@
# MPC System API 流程文档
本文档描述 MPC 系统中的 Keygen、Sign 以及签名方配置 API 的调用流程。
**重要设计原则:**
- `username` 是整个 MPC 系统中所有逻辑关系的唯一标识
- 事件型数据库设计:只插入不修改,保证数据安全性和可追溯性
## 目录
1. [Keygen API](#1-keygen-api)
2. [Sign API](#2-sign-api)
3. [签名方配置 API](#3-签名方配置-api)
4. [API 参数汇总](#4-api-参数汇总)
5. [签名方选择逻辑](#5-签名方选择逻辑)
6. [事件型数据库设计](#6-事件型数据库设计)
---
## 1. Keygen API
### 端点
```
POST /mpc/keygen
```
### 流程图
```
┌─────────┐ ┌─────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Client │────▶│ Account API │────▶│ Session Coordinator │────▶│ Server Party │
└─────────┘ └─────────────┘ └──────────────────┘ └─────────────┘
│ │ │ │
│ POST /keygen │ │ │
│ {username, │ │ │
│ threshold_n, │ 检查 username │ │
│ threshold_t} │ 是否已存在 │ │
│───────────────▶│─────┐ │ │
│ │ │ │ │
│ │◀────┘ │ │
│ │ │ │
│ │ CreateSession │ │
│ │ (keygen, n, t) │ │
│ │─────────────────────▶│ │
│ │ │ │
│ │ │ NotifyParties │
│ │ │─────────────────────▶│
│ │ │ │
│ │ session_id │ │
│ │◀─────────────────────│ │
│ │ │ │
│ {session_id, │ │ │
│ username, │ │ │
│ status} │ │ │
│◀───────────────│ │ │
│ │ │ │
│ [MPC Keygen Protocol Execution] │
│ │ │ │
│ WebSocket │ │ │
│ 完成通知 │ │ │
│◀═══════════════│ │ │
│ {public_key} │ │ │
```
### 请求参数
```json
{
"username": "string", // 用户名(必填,唯一标识)
"threshold_n": 3, // 总参与方数量(必填)
"threshold_t": 2, // 签名阈值(必填)
"require_delegate": true // 是否需要委托方(可选)
}
```
### 响应
```json
{
"session_id": "uuid",
"session_type": "keygen",
"username": "string",
"threshold_n": 3,
"threshold_t": 2,
"selected_parties": ["party1", "party2", "party3"],
"delegate_party": "party3",
"status": "created"
}
```
### 错误响应
- `409 Conflict`: username 已存在(应使用 Sign API
- `400 Bad Request`: threshold_t > threshold_n
---
## 2. Sign API
### 端点
```
POST /mpc/sign
```
### 流程图
```
┌─────────┐ ┌─────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Client │────▶│ Account API │────▶│ Session Coordinator │────▶│ Server Party │
└─────────┘ └─────────────┘ └──────────────────┘ └─────────────┘
│ │ │ │
│ POST /sign │ │ │
│ {username, │ │ │
│ message_hash,│ 通过 username │ │
│ user_share} │ 查询 Account │ │
│───────────────▶│ 检查 SigningParties │ │
│ │─────┐ │ │
│ │ │ │ │
│ │◀────┘ │ │
│ │ │ │
│ │ CreateSession │ │
│ │ (sign, parties) │ │
│ │─────────────────────▶│ │
│ │ │ │
│ │ │ NotifyParties │
│ │ │ (configured or all) │
│ │ │─────────────────────▶│
│ │ │ │
│ │ session_id │ │
│ │◀─────────────────────│ │
│ │ │ │
│ {session_id, │ │ │
│ username, │ │ │
│ status} │ │ │
│◀───────────────│ │ │
│ │ │ │
│ [MPC Sign Protocol Execution] │
│ │ │ │
│ WebSocket │ │ │
│ 完成通知 │ │ │
│◀═══════════════│ │ │
│ {signature} │ │ │
```
### 请求参数
```json
{
"username": "string", // 用户名(必填,唯一标识)
"message_hash": "hex", // 待签名消息哈希必填32字节SHA-256
"user_share": "hex" // 用户Share可选账户有delegate时必填
}
```
### 响应
```json
{
"session_id": "uuid",
"session_type": "sign",
"username": "string",
"message_hash": "hex",
"threshold_t": 2,
"selected_parties": ["party1", "party2"],
"has_delegate": true,
"delegate_party_id": "party2",
"status": "created"
}
```
### 错误响应
- `404 Not Found`: username 不存在
- `400 Bad Request`: 账户有 delegate 但未提供 user_share
### 签名方选择逻辑
1. 如果 Account 配置了 `signing_parties`(非空)→ 使用配置的参与方
2. 如果 Account 未配置 `signing_parties`空或NULL→ 使用所有活跃的参与方
---
## 3. 签名方配置 API
**所有签名方配置 API 使用 username 作为路径参数**
### 3.1 设置签名方配置(首次)
#### 端点
```
POST /accounts/by-username/:username/signing-config
```
#### 流程图
```
┌─────────┐ ┌─────────────┐ ┌──────────────┐ ┌────────────┐
│ Client │────▶│ Account API │────▶│ Account Repo │────▶│ PostgreSQL │
└─────────┘ └─────────────┘ └──────────────┘ └────────────┘
│ │ │ │
│ POST │ │ │
│ /signing-config │ │
│ {party_ids} │ │ │
│───────────────▶│ │ │
│ │ │ │
│ │ GetByUsername │ │
│ │────────────────────▶│ │
│ │ │ SELECT │
│ │ │──────────────────▶│
│ │ │◀──────────────────│
│ │◀────────────────────│ │
│ │ │ │
│ │ 检查: │ │
│ │ - 是否已配置? │ │
│ │ - party数量=t? │ │
│ │─────┐ │ │
│ │ │ │ │
│ │◀────┘ │ │
│ │ │ │
│ │ Update │ │
│ │ (signing_parties) │ │
│ │────────────────────▶│ │
│ │ │ UPDATE │
│ │ │──────────────────▶│
│ │ │◀──────────────────│
│ │◀────────────────────│ │
│ │ │ │
│ {success, │ │ │
│ username, │ │ │
│ party_ids} │ │ │
│◀───────────────│ │ │
```
#### 请求参数
```json
{
"party_ids": ["party1", "party2"] // 签名方ID列表必填数量必须等于threshold_t
}
```
#### 响应
```json
{
"message": "signing parties configured successfully",
"username": "string",
"signing_parties": ["party1", "party2"],
"threshold_t": 2
}
```
#### 错误响应
- `400 Bad Request`: 参与方数量不等于 threshold_t
- `409 Conflict`: 已存在配置(应使用 PUT 更新)
- `404 Not Found`: username 不存在
---
### 3.2 更新签名方配置
#### 端点
```
PUT /accounts/by-username/:username/signing-config
```
#### 请求参数
```json
{
"party_ids": ["party1", "party3"] // 新的签名方ID列表
}
```
#### 响应
```json
{
"message": "signing parties updated successfully",
"username": "string",
"signing_parties": ["party1", "party3"],
"threshold_t": 2
}
```
#### 错误响应
- `400 Bad Request`: 参与方数量不等于 threshold_t
- `404 Not Found`: username 不存在或未配置签名方
---
### 3.3 清除签名方配置
#### 端点
```
DELETE /accounts/by-username/:username/signing-config
```
#### 响应
```json
{
"message": "signing parties cleared - all active parties will be used for signing",
"username": "string"
}
```
---
### 3.4 查询签名方配置
#### 端点
```
GET /accounts/by-username/:username/signing-config
```
#### 响应(已配置)
```json
{
"configured": true,
"username": "string",
"signing_parties": ["party1", "party2"],
"threshold_t": 2
}
```
#### 响应(未配置)
```json
{
"configured": false,
"username": "string",
"message": "no signing parties configured - all active parties will be used",
"active_parties": ["party1", "party2", "party3"],
"threshold_t": 2
}
```
---
## 4. API 参数汇总
| API | Method | 路径 | 主要参数 | 返回值 |
|-----|--------|------|----------|--------|
| Keygen | POST | `/mpc/keygen` | username, threshold_n, threshold_t | session_id, username, status |
| Sign | POST | `/mpc/sign` | username, message_hash, user_share | session_id, username, parties |
| 设置签名方 | POST | `/accounts/by-username/:username/signing-config` | party_ids[] | username, signing_parties |
| 更新签名方 | PUT | `/accounts/by-username/:username/signing-config` | party_ids[] | username, signing_parties |
| 清除签名方 | DELETE | `/accounts/by-username/:username/signing-config` | - | username, message |
| 查询签名方 | GET | `/accounts/by-username/:username/signing-config` | - | username, signing_parties, configured |
---
## 5. 签名方选择逻辑
```
┌─────────────────┐
│ Sign Request │
│ (username) │
└────────┬────────┘
┌─────────────────┐
│ GetByUsername │
│ 查询 Account │
│ SigningParties │
└────────┬────────┘
┌─────────────────┐
│ HasSigningParties│
│ Config? │
└────────┬────────┘
┌──────────────┴──────────────┐
│ YES │ NO
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ 使用配置的 │ │ 查询所有活跃的 │
│ SigningParties │ │ Server Parties │
│ (固定 t 个) │ │ (可能 > t 个) │
└────────┬────────┘ └────────┬────────┘
│ │
└──────────────┬─────────────┘
┌─────────────────┐
│ 创建签名会话 │
│ (指定参与方) │
└─────────────────┘
```
### 使用场景
1. **默认模式(未配置)**
- 所有活跃的 Server Party 都会参与签名
- 适用于需要最大灵活性的场景
2. **配置模式(已配置)**
- 只有指定的 Party 参与签名
- 适用于固定签名方组合的场景
- 例如:指定 "server-party-1" 和 "delegate-party" 作为签名方
### 验证规则
- `party_ids` 数量必须等于账户的 `threshold_t`
- `party_ids` 中不能有重复项
- `party_ids` 中不能有空字符串
- `party_ids` 必须是账户的活跃 share
---
## 6. 事件型数据库设计
### 设计原则
为保证数据安全性和可追溯性采用事件溯源Event Sourcing模式
- **只插入,不修改** - 所有状态变更都作为新事件插入
- **不可变日志** - 事件记录一旦写入不可更改
- **状态可重建** - 当前状态可通过重放事件得到
### session_events 表结构
```sql
CREATE TABLE session_events (
id UUID PRIMARY KEY,
session_id UUID NOT NULL,
username VARCHAR(255) NOT NULL, -- 用户唯一标识
event_type VARCHAR(50) NOT NULL, -- 事件类型
session_type VARCHAR(20) NOT NULL, -- keygen 或 sign
-- 事件快照数据
threshold_n INTEGER,
threshold_t INTEGER,
party_id VARCHAR(255),
party_index INTEGER,
message_hash BYTEA,
public_key BYTEA,
signature BYTEA,
error_message TEXT,
metadata JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
```
### 事件类型
| 事件类型 | 描述 |
|---------|------|
| `session_created` | 会话创建 |
| `party_joined` | 参与方加入 |
| `party_ready` | 参与方就绪 |
| `round_started` | MPC 轮次开始 |
| `round_completed` | MPC 轮次完成 |
| `session_completed` | 会话成功完成 |
| `session_failed` | 会话失败 |
| `session_expired` | 会话过期 |
| `delegate_share_sent` | 委托 share 已发送给用户 |
| `signing_config_set` | 签名方配置已设置 |
| `signing_config_cleared` | 签名方配置已清除 |
### 查询当前状态
通过视图 `session_current_state` 获取最新状态:
```sql
SELECT * FROM session_current_state WHERE username = 'alice';
```
---
## 更新日志
| 日期 | 版本 | 描述 |
|------|------|------|
| 2024-XX-XX | 1.0 | 初始版本 |
| 2024-XX-XX | 2.0 | 全面采用 username 作为唯一标识,添加事件型数据库设计 |

View File

@ -0,0 +1,13 @@
-- Rollback: Remove session events table and view
DROP VIEW IF EXISTS session_current_state;
DROP INDEX IF EXISTS idx_session_events_user_sessions;
DROP INDEX IF EXISTS idx_session_events_session_timeline;
DROP INDEX IF EXISTS idx_session_events_session_type;
DROP INDEX IF EXISTS idx_session_events_event_type;
DROP INDEX IF EXISTS idx_session_events_created_at;
DROP INDEX IF EXISTS idx_session_events_username;
DROP INDEX IF EXISTS idx_session_events_session_id;
DROP TABLE IF EXISTS session_events;

View File

@ -0,0 +1,77 @@
-- Session Events table (Event Sourcing pattern - insert only, never update)
-- This provides an immutable audit trail of all session state changes
-- The current session state can be derived by replaying events
CREATE TABLE session_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL,
username VARCHAR(255) NOT NULL, -- The user this session belongs to
event_type VARCHAR(50) NOT NULL, -- Event type: created, joined, ready, completed, failed, expired
session_type VARCHAR(20) NOT NULL, -- 'keygen' or 'sign'
-- Event payload (immutable snapshot at event time)
threshold_n INTEGER,
threshold_t INTEGER,
party_id VARCHAR(255), -- For party-specific events (joined, ready, completed)
party_index INTEGER,
message_hash BYTEA, -- For sign sessions
public_key BYTEA, -- Result of keygen
signature BYTEA, -- Result of sign
error_message TEXT, -- For failed events
-- Metadata
metadata JSONB, -- Additional event-specific data
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
-- Constraints
CONSTRAINT chk_session_event_type CHECK (event_type IN (
'session_created', -- Session initiated
'party_joined', -- A party joined the session
'party_ready', -- A party is ready to start MPC
'round_started', -- MPC round started
'round_completed', -- MPC round completed
'session_completed', -- Session completed successfully
'session_failed', -- Session failed
'session_expired', -- Session expired
'delegate_share_sent', -- Delegate share sent to user
'signing_config_set', -- Signing parties configured
'signing_config_cleared' -- Signing parties cleared
)),
CONSTRAINT chk_session_event_session_type CHECK (session_type IN ('keygen', 'sign'))
);
-- Indexes for efficient querying
CREATE INDEX idx_session_events_session_id ON session_events(session_id);
CREATE INDEX idx_session_events_username ON session_events(username);
CREATE INDEX idx_session_events_created_at ON session_events(created_at);
CREATE INDEX idx_session_events_event_type ON session_events(event_type);
CREATE INDEX idx_session_events_session_type ON session_events(session_type);
-- Composite index for common queries
CREATE INDEX idx_session_events_session_timeline ON session_events(session_id, created_at);
CREATE INDEX idx_session_events_user_sessions ON session_events(username, session_type, created_at DESC);
-- Comments
COMMENT ON TABLE session_events IS 'Immutable event log for MPC sessions - append only, never update or delete';
COMMENT ON COLUMN session_events.username IS 'The unique user identifier that links all MPC operations';
COMMENT ON COLUMN session_events.event_type IS 'Type of event that occurred';
COMMENT ON COLUMN session_events.metadata IS 'Additional JSON data specific to the event type';
-- View to get current session state by replaying events
CREATE OR REPLACE VIEW session_current_state AS
SELECT DISTINCT ON (session_id)
session_id,
username,
session_type,
threshold_n,
threshold_t,
event_type as current_status,
public_key,
signature,
error_message,
created_at as last_event_at,
(SELECT MIN(created_at) FROM session_events e2 WHERE e2.session_id = session_events.session_id) as started_at
FROM session_events
ORDER BY session_id, created_at DESC;
COMMENT ON VIEW session_current_state IS 'Derived view showing current state of each session from event log';

View File

@ -13,12 +13,15 @@ import (
"github.com/rwadurian/mpc-system/services/account/application/ports"
"github.com/rwadurian/mpc-system/services/account/application/use_cases"
"github.com/rwadurian/mpc-system/services/account/domain/entities"
"github.com/rwadurian/mpc-system/services/account/domain/repositories"
"github.com/rwadurian/mpc-system/services/account/domain/value_objects"
"go.uber.org/zap"
)
// AccountHTTPHandler handles HTTP requests for accounts
type AccountHTTPHandler struct {
accountRepo repositories.AccountRepository
sessionEventRepo repositories.SessionEventRepository
createAccountUC *use_cases.CreateAccountUseCase
getAccountUC *use_cases.GetAccountUseCase
updateAccountUC *use_cases.UpdateAccountUseCase
@ -37,6 +40,8 @@ type AccountHTTPHandler struct {
// NewAccountHTTPHandler creates a new AccountHTTPHandler
func NewAccountHTTPHandler(
accountRepo repositories.AccountRepository,
sessionEventRepo repositories.SessionEventRepository,
createAccountUC *use_cases.CreateAccountUseCase,
getAccountUC *use_cases.GetAccountUseCase,
updateAccountUC *use_cases.UpdateAccountUseCase,
@ -53,6 +58,8 @@ func NewAccountHTTPHandler(
sessionCoordinatorClient *grpcclient.SessionCoordinatorClient,
) *AccountHTTPHandler {
return &AccountHTTPHandler{
accountRepo: accountRepo,
sessionEventRepo: sessionEventRepo,
createAccountUC: createAccountUC,
getAccountUC: getAccountUC,
updateAccountUC: updateAccountUC,
@ -81,11 +88,11 @@ func (h *AccountHTTPHandler) RegisterRoutes(router *gin.RouterGroup) {
accounts.PUT("/:id", h.UpdateAccount)
accounts.GET("/:id/shares", h.GetAccountShares)
accounts.DELETE("/:id/shares/:shareId", h.DeactivateShare)
// Signing parties configuration
accounts.POST("/:id/signing-config", h.SetSigningParties)
accounts.PUT("/:id/signing-config", h.UpdateSigningParties)
accounts.DELETE("/:id/signing-config", h.ClearSigningParties)
accounts.GET("/:id/signing-config", h.GetSigningParties)
// Signing parties configuration (use username as path parameter)
accounts.POST("/by-username/:username/signing-config", h.SetSigningParties)
accounts.PUT("/by-username/:username/signing-config", h.UpdateSigningParties)
accounts.DELETE("/by-username/:username/signing-config", h.ClearSigningParties)
accounts.GET("/by-username/:username/signing-config", h.GetSigningParties)
}
auth := router.Group("/auth")
@ -544,6 +551,7 @@ func (h *AccountHTTPHandler) CancelRecovery(c *gin.Context) {
// CreateKeygenSessionRequest represents the request for creating a keygen session
// Coordinator will automatically select parties from registered pool
type CreateKeygenSessionRequest struct {
Username string `json:"username" binding:"required"` // Username - the unique identifier for all relationships
ThresholdN int `json:"threshold_n" binding:"required,min=2"` // Total number of parties (e.g., 3)
ThresholdT int `json:"threshold_t" binding:"required,min=1"` // Threshold for signing (e.g., 2)
RequireDelegate bool `json:"require_delegate"` // If true, one party will be delegate (returns share to user)
@ -564,11 +572,23 @@ func (h *AccountHTTPHandler) CreateKeygenSession(c *gin.Context) {
return
}
// Check if username already exists (keygen should be for new users)
exists, err := h.accountRepo.ExistsByUsername(c.Request.Context(), req.Username)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check username"})
return
}
if exists {
c.JSON(http.StatusConflict, gin.H{"error": "username already exists, use sign API instead"})
return
}
// Call session coordinator via gRPC (no participants - coordinator selects automatically)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
logger.Info("Calling CreateKeygenSession via gRPC (auto party selection)",
zap.String("username", req.Username),
zap.Int("threshold_n", req.ThresholdN),
zap.Int("threshold_t", req.ThresholdT),
zap.Bool("require_delegate", req.RequireDelegate))
@ -604,10 +624,30 @@ func (h *AccountHTTPHandler) CreateKeygenSession(c *gin.Context) {
zap.String("session_id", resp.SessionID),
zap.Int("num_parties", len(resp.SelectedParties)))
// Record session_created event
sessionID, _ := uuid.Parse(resp.SessionID)
event := repositories.NewSessionEvent(
sessionID,
req.Username,
repositories.EventSessionCreated,
repositories.SessionTypeKeygen,
).WithThreshold(req.ThresholdN, req.ThresholdT).WithMetadata(map[string]interface{}{
"selected_parties": resp.SelectedParties,
"delegate_party": resp.DelegateParty,
"require_delegate": req.RequireDelegate,
"persistent_count": persistentCount,
"delegate_count": delegateCount,
})
if err := h.sessionEventRepo.Create(c.Request.Context(), event); err != nil {
logger.Error("Failed to record session event", zap.Error(err))
// Don't fail the request, just log the error
}
// Return response with selected parties info
c.JSON(http.StatusCreated, gin.H{
"session_id": resp.SessionID,
"session_type": "keygen",
"username": req.Username, // Include username in response for reference
"threshold_n": req.ThresholdN,
"threshold_t": req.ThresholdT,
"selected_parties": resp.SelectedParties,
@ -619,7 +659,7 @@ func (h *AccountHTTPHandler) CreateKeygenSession(c *gin.Context) {
// CreateSigningSessionRequest represents the request for creating a signing session
// Coordinator will automatically select parties based on account's registered shares
type CreateSigningSessionRequest struct {
AccountID string `json:"account_id" binding:"required"` // Account to sign for
Username string `json:"username" binding:"required"` // Username - the unique identifier for all relationships
MessageHash string `json:"message_hash" binding:"required"` // SHA-256 hash to sign (hex encoded)
UserShare string `json:"user_share"` // Required if account has delegate share: user's encrypted share (hex)
}
@ -633,13 +673,6 @@ func (h *AccountHTTPHandler) CreateSigningSession(c *gin.Context) {
return
}
// Validate account ID
accountID, err := value_objects.AccountIDFromString(req.AccountID)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid account ID"})
return
}
// Decode message hash
messageHash, err := hex.DecodeString(req.MessageHash)
if err != nil {
@ -652,12 +685,12 @@ func (h *AccountHTTPHandler) CreateSigningSession(c *gin.Context) {
return
}
// Get account to verify it exists and get share info
// Get account by username to verify it exists and get share info
accountOutput, err := h.getAccountUC.Execute(c.Request.Context(), ports.GetAccountInput{
AccountID: &accountID,
Username: &req.Username,
})
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "account not found"})
c.JSON(http.StatusNotFound, gin.H{"error": "account not found for username: " + req.Username})
return
}
@ -696,14 +729,14 @@ func (h *AccountHTTPHandler) CreateSigningSession(c *gin.Context) {
partyIDs = configuredParties
logger.Info("Using configured signing parties",
zap.String("account_id", req.AccountID),
zap.String("username", req.Username),
zap.Strings("configured_parties", partyIDs))
} else {
// Use all active parties (original behavior)
partyIDs = allActivePartyIDs
logger.Info("Using all active parties for signing",
zap.String("account_id", req.AccountID),
zap.String("username", req.Username),
zap.Strings("active_parties", partyIDs))
}
@ -757,13 +790,13 @@ func (h *AccountHTTPHandler) CreateSigningSession(c *gin.Context) {
PartyIndex: int32(delegateShare.PartyIndex),
}
logger.Info("Calling CreateSigningSession with delegate user share",
zap.String("account_id", req.AccountID),
zap.String("username", req.Username),
zap.String("delegate_party_id", delegateShare.PartyID),
zap.Int("threshold_t", accountOutput.Account.ThresholdT),
zap.Int("available_parties", len(partyIDs)))
} else {
logger.Info("Calling CreateSigningSession via gRPC (auto party selection)",
zap.String("account_id", req.AccountID),
zap.String("username", req.Username),
zap.Int("threshold_t", accountOutput.Account.ThresholdT),
zap.Int("available_parties", len(partyIDs)))
}
@ -787,10 +820,32 @@ func (h *AccountHTTPHandler) CreateSigningSession(c *gin.Context) {
zap.String("session_id", resp.SessionID),
zap.Int("num_parties", len(resp.SelectedParties)))
// Record session_created event for sign
signSessionID, _ := uuid.Parse(resp.SessionID)
signEvent := repositories.NewSessionEvent(
signSessionID,
req.Username,
repositories.EventSessionCreated,
repositories.SessionTypeSign,
).WithThreshold(accountOutput.Account.ThresholdN, accountOutput.Account.ThresholdT).
WithMessageHash(messageHash).
WithMetadata(map[string]interface{}{
"selected_parties": resp.SelectedParties,
"has_delegate": delegateShare != nil,
"signing_parties_config": accountOutput.Account.HasSigningPartiesConfig(),
})
if delegateShare != nil {
signEvent = signEvent.WithParty(delegateShare.PartyID, delegateShare.PartyIndex)
}
if err := h.sessionEventRepo.Create(c.Request.Context(), signEvent); err != nil {
logger.Error("Failed to record session event", zap.Error(err))
// Don't fail the request, just log the error
}
response := gin.H{
"session_id": resp.SessionID,
"session_type": "sign",
"account_id": req.AccountID,
"username": req.Username,
"message_hash": req.MessageHash,
"threshold_t": accountOutput.Account.ThresholdT,
"selected_parties": resp.SelectedParties,
@ -986,12 +1041,11 @@ type SigningPartiesRequest struct {
}
// SetSigningParties handles setting signing parties for the first time
// POST /accounts/:id/signing-config
// POST /accounts/by-username/:username/signing-config
func (h *AccountHTTPHandler) SetSigningParties(c *gin.Context) {
idStr := c.Param("id")
accountID, err := value_objects.AccountIDFromString(idStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid account ID"})
username := c.Param("username")
if username == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "username is required"})
return
}
@ -1001,12 +1055,12 @@ func (h *AccountHTTPHandler) SetSigningParties(c *gin.Context) {
return
}
// Get account
// Get account by username
accountOutput, err := h.getAccountUC.Execute(c.Request.Context(), ports.GetAccountInput{
AccountID: &accountID,
Username: &username,
})
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "account not found"})
c.JSON(http.StatusNotFound, gin.H{"error": "account not found for username: " + username})
return
}
@ -1045,7 +1099,7 @@ func (h *AccountHTTPHandler) SetSigningParties(c *gin.Context) {
// Update account in database
_, err = h.updateAccountUC.Execute(c.Request.Context(), ports.UpdateAccountInput{
AccountID: accountID,
AccountID: accountOutput.Account.ID,
SigningParties: req.PartyIDs,
})
if err != nil {
@ -1054,23 +1108,38 @@ func (h *AccountHTTPHandler) SetSigningParties(c *gin.Context) {
}
logger.Info("Signing parties configured",
zap.String("account_id", idStr),
zap.String("username", username),
zap.Strings("signing_parties", req.PartyIDs))
// Record signing_config_set event
event := repositories.NewSessionEvent(
uuid.New(), // Generate new event ID (no session for config changes)
username,
repositories.EventSigningConfigSet,
repositories.SessionTypeSign, // Configuration is for signing
).WithThreshold(accountOutput.Account.ThresholdN, accountOutput.Account.ThresholdT).
WithMetadata(map[string]interface{}{
"operation": "set",
"signing_parties": req.PartyIDs,
})
if err := h.sessionEventRepo.Create(c.Request.Context(), event); err != nil {
logger.Error("Failed to record signing config event", zap.Error(err))
}
c.JSON(http.StatusCreated, gin.H{
"message": "signing parties configured successfully",
"username": username,
"signing_parties": req.PartyIDs,
"threshold_t": accountOutput.Account.ThresholdT,
})
}
// UpdateSigningParties handles updating existing signing parties configuration
// PUT /accounts/:id/signing-config
// PUT /accounts/by-username/:username/signing-config
func (h *AccountHTTPHandler) UpdateSigningParties(c *gin.Context) {
idStr := c.Param("id")
accountID, err := value_objects.AccountIDFromString(idStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid account ID"})
username := c.Param("username")
if username == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "username is required"})
return
}
@ -1080,12 +1149,12 @@ func (h *AccountHTTPHandler) UpdateSigningParties(c *gin.Context) {
return
}
// Get account
// Get account by username
accountOutput, err := h.getAccountUC.Execute(c.Request.Context(), ports.GetAccountInput{
AccountID: &accountID,
Username: &username,
})
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "account not found"})
c.JSON(http.StatusNotFound, gin.H{"error": "account not found for username: " + username})
return
}
@ -1123,7 +1192,7 @@ func (h *AccountHTTPHandler) UpdateSigningParties(c *gin.Context) {
// Update account in database
_, err = h.updateAccountUC.Execute(c.Request.Context(), ports.UpdateAccountInput{
AccountID: accountID,
AccountID: accountOutput.Account.ID,
SigningParties: req.PartyIDs,
})
if err != nil {
@ -1132,32 +1201,47 @@ func (h *AccountHTTPHandler) UpdateSigningParties(c *gin.Context) {
}
logger.Info("Signing parties updated",
zap.String("account_id", idStr),
zap.String("username", username),
zap.Strings("signing_parties", req.PartyIDs))
// Record signing_config_set event for update
updateEvent := repositories.NewSessionEvent(
uuid.New(),
username,
repositories.EventSigningConfigSet,
repositories.SessionTypeSign,
).WithThreshold(accountOutput.Account.ThresholdN, accountOutput.Account.ThresholdT).
WithMetadata(map[string]interface{}{
"operation": "update",
"signing_parties": req.PartyIDs,
})
if err := h.sessionEventRepo.Create(c.Request.Context(), updateEvent); err != nil {
logger.Error("Failed to record signing config event", zap.Error(err))
}
c.JSON(http.StatusOK, gin.H{
"message": "signing parties updated successfully",
"username": username,
"signing_parties": req.PartyIDs,
"threshold_t": accountOutput.Account.ThresholdT,
})
}
// ClearSigningParties handles clearing signing parties configuration
// DELETE /accounts/:id/signing-config
// DELETE /accounts/by-username/:username/signing-config
func (h *AccountHTTPHandler) ClearSigningParties(c *gin.Context) {
idStr := c.Param("id")
accountID, err := value_objects.AccountIDFromString(idStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid account ID"})
username := c.Param("username")
if username == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "username is required"})
return
}
// Get account
// Get account by username
accountOutput, err := h.getAccountUC.Execute(c.Request.Context(), ports.GetAccountInput{
AccountID: &accountID,
Username: &username,
})
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "account not found"})
c.JSON(http.StatusNotFound, gin.H{"error": "account not found for username: " + username})
return
}
@ -1171,7 +1255,7 @@ func (h *AccountHTTPHandler) ClearSigningParties(c *gin.Context) {
// Clear signing parties - pass empty slice
_, err = h.updateAccountUC.Execute(c.Request.Context(), ports.UpdateAccountInput{
AccountID: accountID,
AccountID: accountOutput.Account.ID,
ClearSigningParties: true,
})
if err != nil {
@ -1180,35 +1264,50 @@ func (h *AccountHTTPHandler) ClearSigningParties(c *gin.Context) {
}
logger.Info("Signing parties cleared",
zap.String("account_id", idStr))
zap.String("username", username))
// Record signing_config_cleared event
clearEvent := repositories.NewSessionEvent(
uuid.New(),
username,
repositories.EventSigningConfigCleared,
repositories.SessionTypeSign,
).WithThreshold(accountOutput.Account.ThresholdN, accountOutput.Account.ThresholdT).
WithMetadata(map[string]interface{}{
"operation": "clear",
})
if err := h.sessionEventRepo.Create(c.Request.Context(), clearEvent); err != nil {
logger.Error("Failed to record signing config event", zap.Error(err))
}
c.JSON(http.StatusOK, gin.H{
"message": "signing parties cleared - all active parties will be used for signing",
"username": username,
})
}
// GetSigningParties handles getting current signing parties configuration
// GET /accounts/:id/signing-config
// GET /accounts/by-username/:username/signing-config
func (h *AccountHTTPHandler) GetSigningParties(c *gin.Context) {
idStr := c.Param("id")
accountID, err := value_objects.AccountIDFromString(idStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid account ID"})
username := c.Param("username")
if username == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "username is required"})
return
}
// Get account
// Get account by username
accountOutput, err := h.getAccountUC.Execute(c.Request.Context(), ports.GetAccountInput{
AccountID: &accountID,
Username: &username,
})
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "account not found"})
c.JSON(http.StatusNotFound, gin.H{"error": "account not found for username: " + username})
return
}
if accountOutput.Account.HasSigningPartiesConfig() {
c.JSON(http.StatusOK, gin.H{
"configured": true,
"username": username,
"signing_parties": accountOutput.Account.GetSigningParties(),
"threshold_t": accountOutput.Account.ThresholdT,
})
@ -1222,6 +1321,7 @@ func (h *AccountHTTPHandler) GetSigningParties(c *gin.Context) {
}
c.JSON(http.StatusOK, gin.H{
"configured": false,
"username": username,
"message": "no signing parties configured - all active parties will be used",
"active_parties": activeParties,
"threshold_t": accountOutput.Account.ThresholdT,

View File

@ -0,0 +1,231 @@
package postgres
import (
"context"
"database/sql"
"encoding/json"
"github.com/google/uuid"
"github.com/rwadurian/mpc-system/services/account/domain/repositories"
)
// SessionEventPostgresRepo implements SessionEventRepository using PostgreSQL
// This is an append-only repository - events are never updated or deleted
type SessionEventPostgresRepo struct {
db *sql.DB
}
// NewSessionEventPostgresRepo creates a new SessionEventPostgresRepo
func NewSessionEventPostgresRepo(db *sql.DB) repositories.SessionEventRepository {
return &SessionEventPostgresRepo{db: db}
}
// Create inserts a new event (append-only, never update)
func (r *SessionEventPostgresRepo) Create(ctx context.Context, event *repositories.SessionEvent) error {
query := `
INSERT INTO session_events (
id, session_id, username, event_type, session_type,
threshold_n, threshold_t, party_id, party_index,
message_hash, public_key, signature, error_message,
metadata, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
`
var metadataJSON []byte
var err error
if event.Metadata != nil {
metadataJSON, err = json.Marshal(event.Metadata)
if err != nil {
return err
}
}
_, err = r.db.ExecContext(ctx, query,
event.ID,
event.SessionID,
event.Username,
string(event.EventType),
string(event.SessionType),
event.ThresholdN,
event.ThresholdT,
event.PartyID,
event.PartyIndex,
event.MessageHash,
event.PublicKey,
event.Signature,
event.ErrorMessage,
metadataJSON,
event.CreatedAt,
)
return err
}
// GetBySessionID retrieves all events for a session, ordered by creation time
func (r *SessionEventPostgresRepo) GetBySessionID(ctx context.Context, sessionID uuid.UUID) ([]*repositories.SessionEvent, error) {
query := `
SELECT id, session_id, username, event_type, session_type,
threshold_n, threshold_t, party_id, party_index,
message_hash, public_key, signature, error_message,
metadata, created_at
FROM session_events
WHERE session_id = $1
ORDER BY created_at ASC
`
rows, err := r.db.QueryContext(ctx, query, sessionID)
if err != nil {
return nil, err
}
defer rows.Close()
return r.scanEvents(rows)
}
// GetByUsername retrieves all events for a user, ordered by creation time desc
func (r *SessionEventPostgresRepo) GetByUsername(ctx context.Context, username string, limit int) ([]*repositories.SessionEvent, error) {
query := `
SELECT id, session_id, username, event_type, session_type,
threshold_n, threshold_t, party_id, party_index,
message_hash, public_key, signature, error_message,
metadata, created_at
FROM session_events
WHERE username = $1
ORDER BY created_at DESC
LIMIT $2
`
rows, err := r.db.QueryContext(ctx, query, username, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return r.scanEvents(rows)
}
// GetLatestBySessionID retrieves the most recent event for a session
func (r *SessionEventPostgresRepo) GetLatestBySessionID(ctx context.Context, sessionID uuid.UUID) (*repositories.SessionEvent, error) {
query := `
SELECT id, session_id, username, event_type, session_type,
threshold_n, threshold_t, party_id, party_index,
message_hash, public_key, signature, error_message,
metadata, created_at
FROM session_events
WHERE session_id = $1
ORDER BY created_at DESC
LIMIT 1
`
row := r.db.QueryRowContext(ctx, query, sessionID)
return r.scanEvent(row)
}
// GetByUsernameAndType retrieves events for a user filtered by session type
func (r *SessionEventPostgresRepo) GetByUsernameAndType(ctx context.Context, username string, sessionType repositories.SessionType, limit int) ([]*repositories.SessionEvent, error) {
query := `
SELECT id, session_id, username, event_type, session_type,
threshold_n, threshold_t, party_id, party_index,
message_hash, public_key, signature, error_message,
metadata, created_at
FROM session_events
WHERE username = $1 AND session_type = $2
ORDER BY created_at DESC
LIMIT $3
`
rows, err := r.db.QueryContext(ctx, query, username, string(sessionType), limit)
if err != nil {
return nil, err
}
defer rows.Close()
return r.scanEvents(rows)
}
// scanEvent scans a single event from a row
func (r *SessionEventPostgresRepo) scanEvent(row *sql.Row) (*repositories.SessionEvent, error) {
var event repositories.SessionEvent
var eventType, sessionType string
var metadataJSON []byte
err := row.Scan(
&event.ID,
&event.SessionID,
&event.Username,
&eventType,
&sessionType,
&event.ThresholdN,
&event.ThresholdT,
&event.PartyID,
&event.PartyIndex,
&event.MessageHash,
&event.PublicKey,
&event.Signature,
&event.ErrorMessage,
&metadataJSON,
&event.CreatedAt,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
event.EventType = repositories.SessionEventType(eventType)
event.SessionType = repositories.SessionType(sessionType)
if len(metadataJSON) > 0 {
if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil {
return nil, err
}
}
return &event, nil
}
// scanEvents scans multiple events from rows
func (r *SessionEventPostgresRepo) scanEvents(rows *sql.Rows) ([]*repositories.SessionEvent, error) {
var events []*repositories.SessionEvent
for rows.Next() {
var event repositories.SessionEvent
var eventType, sessionType string
var metadataJSON []byte
err := rows.Scan(
&event.ID,
&event.SessionID,
&event.Username,
&eventType,
&sessionType,
&event.ThresholdN,
&event.ThresholdT,
&event.PartyID,
&event.PartyIndex,
&event.MessageHash,
&event.PublicKey,
&event.Signature,
&event.ErrorMessage,
&metadataJSON,
&event.CreatedAt,
)
if err != nil {
return nil, err
}
event.EventType = repositories.SessionEventType(eventType)
event.SessionType = repositories.SessionType(sessionType)
if len(metadataJSON) > 0 {
if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil {
return nil, err
}
}
events = append(events, &event)
}
return events, rows.Err()
}

View File

@ -25,6 +25,7 @@ import (
"github.com/rwadurian/mpc-system/services/account/adapters/output/memory"
"github.com/rwadurian/mpc-system/services/account/adapters/output/postgres"
"github.com/rwadurian/mpc-system/services/account/application/use_cases"
"github.com/rwadurian/mpc-system/services/account/domain/repositories"
"github.com/rwadurian/mpc-system/services/account/domain/services"
"go.uber.org/zap"
)
@ -77,6 +78,7 @@ func main() {
accountRepo := postgres.NewAccountPostgresRepo(db)
shareRepo := postgres.NewAccountSharePostgresRepo(db)
recoveryRepo := postgres.NewRecoverySessionPostgresRepo(db)
sessionEventRepo := postgres.NewSessionEventPostgresRepo(db)
// Initialize adapters (using in-memory implementations)
eventPublisher := memory.NewEventPublisherAdapter()
@ -119,6 +121,8 @@ func main() {
if err := startHTTPServer(
cfg,
jwtService,
accountRepo,
sessionEventRepo,
createAccountUC,
getAccountUC,
updateAccountUC,
@ -219,6 +223,8 @@ func initDatabase(cfg config.DatabaseConfig) (*sql.DB, error) {
func startHTTPServer(
cfg *config.Config,
jwtService *jwt.JWTService,
accountRepo repositories.AccountRepository,
sessionEventRepo repositories.SessionEventRepository,
createAccountUC *use_cases.CreateAccountUseCase,
getAccountUC *use_cases.GetAccountUseCase,
updateAccountUC *use_cases.UpdateAccountUseCase,
@ -268,6 +274,8 @@ func startHTTPServer(
// Create HTTP handler with session coordinator client
httpHandler := httphandler.NewAccountHTTPHandler(
accountRepo,
sessionEventRepo,
createAccountUC,
getAccountUC,
updateAccountUC,

View File

@ -0,0 +1,137 @@
package repositories
import (
"context"
"time"
"github.com/google/uuid"
)
// SessionEventType represents the type of session event
type SessionEventType string
const (
// Session lifecycle events
EventSessionCreated SessionEventType = "session_created"
EventPartyJoined SessionEventType = "party_joined"
EventPartyReady SessionEventType = "party_ready"
EventRoundStarted SessionEventType = "round_started"
EventRoundCompleted SessionEventType = "round_completed"
EventSessionCompleted SessionEventType = "session_completed"
EventSessionFailed SessionEventType = "session_failed"
EventSessionExpired SessionEventType = "session_expired"
// Delegate events
EventDelegateShareSent SessionEventType = "delegate_share_sent"
// Signing config events
EventSigningConfigSet SessionEventType = "signing_config_set"
EventSigningConfigCleared SessionEventType = "signing_config_cleared"
)
// SessionType represents the type of MPC session
type SessionType string
const (
SessionTypeKeygen SessionType = "keygen"
SessionTypeSign SessionType = "sign"
)
// SessionEvent represents an immutable event in the session lifecycle
type SessionEvent struct {
ID uuid.UUID
SessionID uuid.UUID
Username string
EventType SessionEventType
SessionType SessionType
ThresholdN *int
ThresholdT *int
PartyID *string
PartyIndex *int
MessageHash []byte
PublicKey []byte
Signature []byte
ErrorMessage *string
Metadata map[string]interface{}
CreatedAt time.Time
}
// NewSessionEvent creates a new session event with required fields
func NewSessionEvent(
sessionID uuid.UUID,
username string,
eventType SessionEventType,
sessionType SessionType,
) *SessionEvent {
return &SessionEvent{
ID: uuid.New(),
SessionID: sessionID,
Username: username,
EventType: eventType,
SessionType: sessionType,
CreatedAt: time.Now().UTC(),
}
}
// WithThreshold adds threshold info to the event
func (e *SessionEvent) WithThreshold(n, t int) *SessionEvent {
e.ThresholdN = &n
e.ThresholdT = &t
return e
}
// WithParty adds party info to the event
func (e *SessionEvent) WithParty(partyID string, partyIndex int) *SessionEvent {
e.PartyID = &partyID
e.PartyIndex = &partyIndex
return e
}
// WithMessageHash adds message hash to the event
func (e *SessionEvent) WithMessageHash(hash []byte) *SessionEvent {
e.MessageHash = hash
return e
}
// WithPublicKey adds public key to the event
func (e *SessionEvent) WithPublicKey(key []byte) *SessionEvent {
e.PublicKey = key
return e
}
// WithSignature adds signature to the event
func (e *SessionEvent) WithSignature(sig []byte) *SessionEvent {
e.Signature = sig
return e
}
// WithError adds error message to the event
func (e *SessionEvent) WithError(msg string) *SessionEvent {
e.ErrorMessage = &msg
return e
}
// WithMetadata adds metadata to the event
func (e *SessionEvent) WithMetadata(metadata map[string]interface{}) *SessionEvent {
e.Metadata = metadata
return e
}
// SessionEventRepository defines the interface for session event persistence
// This is an append-only repository - events are never updated or deleted
type SessionEventRepository interface {
// Create inserts a new event (append-only, never update)
Create(ctx context.Context, event *SessionEvent) error
// GetBySessionID retrieves all events for a session, ordered by creation time
GetBySessionID(ctx context.Context, sessionID uuid.UUID) ([]*SessionEvent, error)
// GetByUsername retrieves all events for a user, ordered by creation time desc
GetByUsername(ctx context.Context, username string, limit int) ([]*SessionEvent, error)
// GetLatestBySessionID retrieves the most recent event for a session
GetLatestBySessionID(ctx context.Context, sessionID uuid.UUID) (*SessionEvent, error)
// GetByUsernameAndType retrieves events for a user filtered by session type
GetByUsernameAndType(ctx context.Context, username string, sessionType SessionType, limit int) ([]*SessionEvent, error)
}