diff --git a/backend/mpc-system/k8s/server-party-api-deployment.yaml b/backend/mpc-system/k8s/server-party-api-deployment.yaml new file mode 100644 index 00000000..4c37d13a --- /dev/null +++ b/backend/mpc-system/k8s/server-party-api-deployment.yaml @@ -0,0 +1,105 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: mpc-server-party-api + namespace: mpc-system +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mpc-server-party-api + namespace: mpc-system + labels: + app: mpc-server-party-api + component: api +spec: + replicas: 2 # Delegate parties for generating user shares + selector: + matchLabels: + app: mpc-server-party-api + template: + metadata: + labels: + app: mpc-server-party-api + component: api + party-role: delegate # Delegate party generates and returns shares to caller + spec: + serviceAccountName: mpc-server-party-api + containers: + - name: server-party-api + image: mpc-system/server-party-api:latest + imagePullPolicy: IfNotPresent + ports: + - name: grpc + containerPort: 50051 + protocol: TCP + - name: http + containerPort: 8080 + protocol: TCP + env: + - name: MPC_SERVER_GRPC_PORT + value: "50051" + - name: MPC_SERVER_HTTP_PORT + value: "8080" + - name: MPC_SERVER_ENVIRONMENT + valueFrom: + configMapKeyRef: + name: mpc-config + key: environment + - name: SESSION_COORDINATOR_ADDR + value: "mpc-session-coordinator:50051" + - name: MESSAGE_ROUTER_ADDR + value: "mpc-message-router:50051" + - name: MPC_CRYPTO_MASTER_KEY + valueFrom: + secretKeyRef: + name: mpc-secrets + key: crypto_master_key + - name: PARTY_ID + valueFrom: + fieldRef: + fieldPath: metadata.name # Use pod name as unique party ID + resources: + requests: + memory: "256Mi" + cpu: "250m" + limits: + memory: "512Mi" + cpu: "500m" + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 2 +--- +apiVersion: v1 +kind: Service +metadata: + name: mpc-server-party-api + namespace: mpc-system + labels: + app: mpc-server-party-api +spec: + selector: + app: mpc-server-party-api + clusterIP: None # Headless service for service discovery + ports: + - name: grpc + port: 50051 + targetPort: 50051 + protocol: TCP + - name: http + port: 8080 + targetPort: 8080 + protocol: TCP diff --git a/backend/mpc-system/k8s/server-party-deployment.yaml b/backend/mpc-system/k8s/server-party-deployment.yaml index 29e1ae18..b0333398 100644 --- a/backend/mpc-system/k8s/server-party-deployment.yaml +++ b/backend/mpc-system/k8s/server-party-deployment.yaml @@ -22,6 +22,7 @@ spec: labels: app: mpc-server-party component: compute + party-role: persistent # Party role: persistent, delegate, or temporary spec: serviceAccountName: mpc-server-party containers: diff --git a/backend/mpc-system/services/session-coordinator/application/ports/input/session_management_port.go b/backend/mpc-system/services/session-coordinator/application/ports/input/session_management_port.go index b986936a..cbd06e9e 100644 --- a/backend/mpc-system/services/session-coordinator/application/ports/input/session_management_port.go +++ b/backend/mpc-system/services/session-coordinator/application/ports/input/session_management_port.go @@ -5,6 +5,7 @@ import ( "time" "github.com/google/uuid" + "github.com/rwadurian/mpc-system/services/session-coordinator/application/ports/output" "github.com/rwadurian/mpc-system/services/session-coordinator/domain/entities" ) @@ -27,15 +28,24 @@ type SessionManagementPort interface { CloseSession(ctx context.Context, sessionID uuid.UUID) error } +// PartyComposition defines the composition of parties required for a session +type PartyComposition struct { + PersistentCount int // Number of persistent parties (store shares in database) + DelegateCount int // Number of delegate parties (generate and return shares) + TemporaryCount int // Number of temporary parties + CustomFilters []output.PartySelectionFilter // Custom party selection filters +} + // CreateSessionInput contains input for creating a session type CreateSessionInput struct { - InitiatorID string - SessionType string // "keygen" or "sign" - ThresholdN int - ThresholdT int - Participants []ParticipantInfo - MessageHash []byte // For sign sessions - ExpiresIn time.Duration + InitiatorID string + SessionType string // "keygen" or "sign" + ThresholdN int + ThresholdT int + Participants []ParticipantInfo + PartyComposition *PartyComposition // Optional: specify party composition by role + MessageHash []byte // For sign sessions + ExpiresIn time.Duration } // ParticipantInfo contains information about a participant diff --git a/backend/mpc-system/services/session-coordinator/application/ports/output/party_pool_port.go b/backend/mpc-system/services/session-coordinator/application/ports/output/party_pool_port.go index 742790c0..956ced14 100644 --- a/backend/mpc-system/services/session-coordinator/application/ports/output/party_pool_port.go +++ b/backend/mpc-system/services/session-coordinator/application/ports/output/party_pool_port.go @@ -1,10 +1,31 @@ package output +// PartyRole represents the role type of a party in the MPC system +type PartyRole string + +const ( + // PartyRolePersistent represents a persistent party that stores key shares in database + PartyRolePersistent PartyRole = "persistent" + + // PartyRoleDelegate represents a temporary party that generates user shares and returns them + PartyRoleDelegate PartyRole = "delegate" + + // PartyRoleTemporary represents a temporary party for ad-hoc operations + PartyRoleTemporary PartyRole = "temporary" +) + // PartyEndpoint represents a party endpoint from the pool type PartyEndpoint struct { Address string PartyID string Ready bool + Role PartyRole // Role of the party (persistent, delegate, temporary) +} + +// PartySelectionFilter defines filtering criteria for party selection +type PartySelectionFilter struct { + Count int // Number of parties to select + Role PartyRole // Required party role (empty string means any role) } // PartyPoolPort defines the interface for party pool management @@ -12,6 +33,12 @@ type PartyPoolPort interface { // GetAvailableParties returns all available party endpoints GetAvailableParties() []PartyEndpoint + // GetAvailablePartiesByRole returns available parties filtered by role + GetAvailablePartiesByRole(role PartyRole) []PartyEndpoint + // SelectParties selects n parties from the available pool SelectParties(n int) ([]PartyEndpoint, error) + + // SelectPartiesWithFilter selects parties based on filter criteria + SelectPartiesWithFilter(filter PartySelectionFilter) ([]PartyEndpoint, error) } diff --git a/backend/mpc-system/services/session-coordinator/application/use_cases/create_session.go b/backend/mpc-system/services/session-coordinator/application/use_cases/create_session.go index 97425e2f..626eb887 100644 --- a/backend/mpc-system/services/session-coordinator/application/use_cases/create_session.go +++ b/backend/mpc-system/services/session-coordinator/application/use_cases/create_session.go @@ -2,6 +2,7 @@ package use_cases import ( "context" + "fmt" "github.com/rwadurian/mpc-system/pkg/jwt" "github.com/rwadurian/mpc-system/pkg/logger" @@ -84,8 +85,35 @@ func (uc *CreateSessionUseCase) Execute( if len(req.Participants) == 0 { // No participants provided - use party pool for automatic selection if uc.partyPool != nil { - // Select parties from K8s pool based on threshold - selectedParties, err := uc.partyPool.SelectParties(threshold.N()) + var selectedParties []output.PartyEndpoint + var err error + + // Check if party composition is specified + if req.PartyComposition != nil { + // Select parties based on composition requirements + selectedParties, err = uc.selectPartiesByComposition(req.PartyComposition) + if err != nil { + logger.Warn("failed to select parties by composition, falling back to simple selection", + zap.Error(err)) + // Try simple selection as fallback + selectedParties, err = uc.partyPool.SelectParties(threshold.N()) + } + } else { + // Default behavior: MUST use persistent parties only + // No fallback - fail if insufficient persistent parties + selectedParties, err = uc.partyPool.SelectPartiesWithFilter(output.PartySelectionFilter{ + Count: threshold.N(), + Role: output.PartyRolePersistent, + }) + if err != nil { + // Return error immediately - insufficient persistent parties + return nil, fmt.Errorf("insufficient persistent parties: need %d persistent parties but not enough available. Use PartyComposition to specify custom party requirements: %w", threshold.N(), err) + } + logger.Info("selected persistent parties by default", + zap.String("session_id", session.ID.String()), + zap.Int("party_count", len(selectedParties))) + } + if err != nil { logger.Warn("failed to select parties from pool, falling back to dynamic join", zap.Error(err), @@ -193,6 +221,68 @@ func (uc *CreateSessionUseCase) Execute( }, nil } +// selectPartiesByComposition selects parties based on composition requirements +func (uc *CreateSessionUseCase) selectPartiesByComposition(composition *input.PartyComposition) ([]output.PartyEndpoint, error) { + if uc.partyPool == nil { + return nil, fmt.Errorf("party pool not configured") + } + + var allSelected []output.PartyEndpoint + + // Select persistent parties + if composition.PersistentCount > 0 { + persistent, err := uc.partyPool.SelectPartiesWithFilter(output.PartySelectionFilter{ + Count: composition.PersistentCount, + Role: output.PartyRolePersistent, + }) + if err != nil { + return nil, fmt.Errorf("failed to select persistent parties: %w", err) + } + allSelected = append(allSelected, persistent...) + } + + // Select delegate parties + if composition.DelegateCount > 0 { + delegate, err := uc.partyPool.SelectPartiesWithFilter(output.PartySelectionFilter{ + Count: composition.DelegateCount, + Role: output.PartyRoleDelegate, + }) + if err != nil { + return nil, fmt.Errorf("failed to select delegate parties: %w", err) + } + allSelected = append(allSelected, delegate...) + } + + // Select temporary parties + if composition.TemporaryCount > 0 { + temporary, err := uc.partyPool.SelectPartiesWithFilter(output.PartySelectionFilter{ + Count: composition.TemporaryCount, + Role: output.PartyRoleTemporary, + }) + if err != nil { + return nil, fmt.Errorf("failed to select temporary parties: %w", err) + } + allSelected = append(allSelected, temporary...) + } + + // Apply custom filters if provided + for _, filter := range composition.CustomFilters { + customParties, err := uc.partyPool.SelectPartiesWithFilter(filter) + if err != nil { + return nil, fmt.Errorf("failed to select parties with custom filter: %w", err) + } + allSelected = append(allSelected, customParties...) + } + + // If no parties were selected (all counts are 0 and no custom filters), return error + // This prevents falling back to unfiltered selection + if len(allSelected) == 0 { + return nil, fmt.Errorf("PartyComposition specified but no parties selected: all counts are zero and no custom filters provided") + } + + return allSelected, nil +} + // ExtractPartyIDs extracts party IDs from participant info func extractPartyIDs(participants []input.ParticipantInfo) []string { ids := make([]string, len(participants)) diff --git a/backend/mpc-system/services/session-coordinator/infrastructure/k8s/party_discovery.go b/backend/mpc-system/services/session-coordinator/infrastructure/k8s/party_discovery.go index 53a2fa39..de1021a8 100644 --- a/backend/mpc-system/services/session-coordinator/infrastructure/k8s/party_discovery.go +++ b/backend/mpc-system/services/session-coordinator/infrastructure/k8s/party_discovery.go @@ -20,6 +20,7 @@ type PartyEndpoint struct { Address string PodName string Ready bool + Role output.PartyRole // Party role extracted from pod labels } // PartyDiscovery handles Kubernetes-based party service discovery @@ -115,6 +116,28 @@ func (pd *PartyDiscovery) GetAvailableParties() []output.PartyEndpoint { Address: ep.Address, PartyID: ep.PodName, // Use pod name as party ID Ready: ep.Ready, + Role: ep.Role, + }) + } + } + return available +} + +// GetAvailablePartiesByRole returns available parties filtered by role +// Implements output.PartyPoolPort interface +func (pd *PartyDiscovery) GetAvailablePartiesByRole(role output.PartyRole) []output.PartyEndpoint { + pd.mu.RLock() + defer pd.mu.RUnlock() + + // Return only ready endpoints matching the specified role + available := make([]output.PartyEndpoint, 0) + for _, ep := range pd.endpoints { + if ep.Ready && ep.Role == role { + available = append(available, output.PartyEndpoint{ + Address: ep.Address, + PartyID: ep.PodName, + Ready: ep.Ready, + Role: ep.Role, }) } } @@ -138,6 +161,30 @@ func (pd *PartyDiscovery) SelectParties(n int) ([]output.PartyEndpoint, error) { return selected, nil } +// SelectPartiesWithFilter selects parties based on filter criteria +// Implements output.PartyPoolPort interface +func (pd *PartyDiscovery) SelectPartiesWithFilter(filter output.PartySelectionFilter) ([]output.PartyEndpoint, error) { + var available []output.PartyEndpoint + + // If role is specified, filter by role; otherwise get all available parties + if filter.Role != "" { + available = pd.GetAvailablePartiesByRole(filter.Role) + } else { + available = pd.GetAvailableParties() + } + + if len(available) < filter.Count { + return nil, fmt.Errorf("insufficient parties with role %s: need %d, have %d", filter.Role, filter.Count, len(available)) + } + + // For now, return first n parties + // TODO: Implement random selection or load balancing strategy + selected := make([]output.PartyEndpoint, filter.Count) + copy(selected, available[:filter.Count]) + + return selected, nil +} + // refresh updates the list of party endpoints from Kubernetes func (pd *PartyDiscovery) refresh() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -162,6 +209,13 @@ func (pd *PartyDiscovery) refresh() error { } } + // Extract party role from pod labels + // Default to persistent if label not found + role := output.PartyRolePersistent + if roleLabel, ok := pod.Labels["party-role"]; ok { + role = output.PartyRole(roleLabel) + } + // Get pod IP if pod.Status.PodIP != "" { // Assuming gRPC port is 50051 (should be configurable) @@ -174,6 +228,7 @@ func (pd *PartyDiscovery) refresh() error { Address: fmt.Sprintf("%s:%s", pod.Status.PodIP, grpcPort), PodName: pod.Name, Ready: ready, + Role: role, }) } }