diff --git a/backend/.claude/settings.local.json b/backend/.claude/settings.local.json index 0d703a0f..64ba9678 100644 --- a/backend/.claude/settings.local.json +++ b/backend/.claude/settings.local.json @@ -30,7 +30,8 @@ "Bash(wsl.exe -- bash -c 'cd ~/rwadurian/backend/mpc-system && docker compose logs server-party-1 | grep -E \"\"Starting|gRPC|port\"\" | tail -10')", "Bash(wsl.exe -- bash -c 'find ~/rwadurian/backend/mpc-system/services/server-party -name \"\"main.go\"\" -path \"\"*/cmd/server/*\"\"')", "Bash(wsl.exe -- bash -c 'cat ~/rwadurian/backend/mpc-system/services/server-party/cmd/server/main.go | grep -E \"\"grpc|GRPC|gRPC|50051\"\" | head -20')", - "Bash(wsl.exe -- bash:*)" + "Bash(wsl.exe -- bash:*)", + "Bash(dir:*)" ], "deny": [], "ask": [] diff --git a/backend/mpc-system/README.md b/backend/mpc-system/README.md index 29acada0..f983f868 100644 --- a/backend/mpc-system/README.md +++ b/backend/mpc-system/README.md @@ -17,18 +17,20 @@ Multi-Party Computation (MPC) system for secure threshold signature scheme (TSS) ## Overview The MPC system implements a 2-of-3 threshold signature scheme where: -- 3 server parties hold key shares -- At least 2 parties are required to generate signatures +- Server parties from a dynamically scalable pool hold key shares +- At least 2 parties are required to generate signatures (configurable threshold) - User shares are generated dynamically and returned to the calling service - All shares are encrypted using AES-256-GCM ### Key Features -- **Threshold Cryptography**: 2-of-3 TSS for enhanced security +- **Threshold Cryptography**: Configurable N-of-M TSS for enhanced security +- **Dynamic Party Pool**: Kubernetes-based service discovery for automatic party scaling - **Distributed Architecture**: Services communicate via gRPC and WebSocket - **Secure Storage**: AES-256-GCM encryption for all stored shares - **API Authentication**: API key and IP-based access control - **Session Management**: Coordinated multi-party computation sessions +- **MPC Protocol Compliance**: DeviceInfo optional, aligning with international MPC standards ## Architecture @@ -51,11 +53,14 @@ The MPC system implements a 2-of-3 threshold signature scheme where: │ │ │ │ │ ▼ ▼ │ │ ┌────────────────────────────────────────────┐ │ -│ │ Server Parties (3 instances) │ │ +│ │ Server Party Pool (Dynamically Scalable) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ -│ │ │ Party 1 │ │ Party 2 │ │ Party 3 │ │ │ -│ │ │ (TSS) │ │ (TSS) │ │ (TSS) │ │ │ -│ │ └──────────┘ └──────────┘ └──────────┘ │ │ +│ │ │ Party 1 │ │ Party 2 │ │ Party 3 │ │ K8s Discovery │ +│ │ │ (TSS) │ │ (TSS) │ │ (TSS) │ │ Auto-selected │ +│ │ └──────────┘ └──────────┘ └──────────┘ │ from pool │ +│ │ ┌──────────┐ ... can scale up/down │ │ +│ │ │ Party N │ │ │ +│ │ └──────────┘ │ │ │ └────────────────────────────────────────────┘ │ │ │ │ ┌────────────────────────────────────────────┐ │ @@ -72,7 +77,24 @@ The MPC system implements a 2-of-3 threshold signature scheme where: └──────────────────────────┘ ``` -## Quick Start +## Deployment Options + +This system supports two deployment modes: + +### Option 1: Docker Compose (Development/Simple Deployment) + +- Quick setup for development or simple production environments +- Fixed 3 server parties (hardcoded IDs) +- See instructions below in "Quick Start" + +### Option 2: Kubernetes (Production/Scalable Deployment) + +- Dynamic party pool with service discovery +- Horizontally scalable server parties +- Recommended for production environments +- See `k8s/README.md` for detailed instructions + +## Quick Start (Docker Compose) ### Prerequisites @@ -248,13 +270,14 @@ Before deploying to production: | Service | Purpose | |---------|---------| -| server-party-1 | TSS party 1 (stores server shares) | -| server-party-2 | TSS party 2 (stores server shares) | -| server-party-3 | TSS party 3 (stores server shares) | +| server-party-1/2/3 | TSS parties (Docker Compose mode - fixed IDs) | +| server-party-pool | TSS party pool (Kubernetes mode - dynamic scaling) | | postgres | Database for session/account data | | redis | Cache and temporary data | | rabbitmq | Message broker for inter-service communication | +**Note**: In Kubernetes mode, server parties are discovered dynamically using K8s service discovery. Parties can be scaled up/down without service interruption. + ### Service Dependencies ``` diff --git a/backend/mpc-system/go.mod b/backend/mpc-system/go.mod index 4770ab3e..e925d634 100644 --- a/backend/mpc-system/go.mod +++ b/backend/mpc-system/go.mod @@ -16,6 +16,9 @@ require ( go.uber.org/zap v1.26.0 golang.org/x/crypto v0.16.0 google.golang.org/grpc v1.60.0 + k8s.io/api v0.29.0 + k8s.io/apimachinery v0.29.0 + k8s.io/client-go v0.29.0 ) replace github.com/agl/ed25519 => github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 diff --git a/backend/mpc-system/k8s/README.md b/backend/mpc-system/k8s/README.md new file mode 100644 index 00000000..c812adc8 --- /dev/null +++ b/backend/mpc-system/k8s/README.md @@ -0,0 +1,198 @@ +# Kubernetes Deployment for MPC System + +This directory contains Kubernetes manifests for deploying the MPC system with dynamic party pool service discovery. + +## Architecture Overview + +The Kubernetes deployment implements a **Party Pool** architecture where: + +- **Server parties are dynamically discovered** via Kubernetes service discovery +- **Session coordinator** automatically selects available parties from the pool +- **Parties can be scaled** up/down without code changes (just scale the deployment) +- **No hardcoded party IDs** - each pod gets a unique name as its party ID + +## Prerequisites + +- Kubernetes cluster (v1.24+) +- kubectl configured to access your cluster +- Docker images built for all services +- PostgreSQL, Redis, and RabbitMQ deployed (see infrastructure/) + +## Quick Start + +### 1. Create namespace + +```bash +kubectl apply -f namespace.yaml +``` + +### 2. Create secrets + +Copy the example secrets file and fill in your actual values: + +```bash +cp secrets-example.yaml secrets.yaml +# Edit secrets.yaml with your base64-encoded secrets +# Generate base64: echo -n "your-secret" | base64 +kubectl apply -f secrets.yaml +``` + +### 3. Create ConfigMap + +```bash +kubectl apply -f configmap.yaml +``` + +### 4. Deploy Session Coordinator + +```bash +kubectl apply -f session-coordinator-deployment.yaml +``` + +The session coordinator requires RBAC permissions to discover party pods. + +### 5. Deploy Server Party Pool + +```bash +kubectl apply -f server-party-deployment.yaml +``` + +This creates a deployment with 3 replicas by default. Each pod gets a unique name (e.g., `mpc-server-party-0`, `mpc-server-party-1`, etc.) which serves as its party ID. + +### 6. Deploy other services + +```bash +kubectl apply -f message-router-deployment.yaml +kubectl apply -f account-service-deployment.yaml +kubectl apply -f server-party-api-deployment.yaml +``` + +## Scaling Server Parties + +To scale the party pool, simply adjust the replica count: + +```bash +# Scale up to 5 parties +kubectl scale deployment mpc-server-party -n mpc-system --replicas=5 + +# Scale down to 2 parties +kubectl scale deployment mpc-server-party -n mpc-system --replicas=2 +``` + +The session coordinator will automatically discover new parties within 30 seconds (configurable via `MPC_PARTY_DISCOVERY_INTERVAL`). + +## Service Discovery Configuration + +The session coordinator uses environment variables to configure party discovery: + +- `K8S_NAMESPACE`: Namespace to search for parties (auto-detected from pod metadata) +- `MPC_PARTY_SERVICE_NAME`: Service name to discover (`mpc-server-party`) +- `MPC_PARTY_LABEL_SELECTOR`: Label selector (`app=mpc-server-party`) +- `MPC_PARTY_GRPC_PORT`: gRPC port for parties (`50051`) +- `MPC_PARTY_DISCOVERY_INTERVAL`: Refresh interval (`30s`) + +## RBAC Permissions + +The session coordinator requires the following Kubernetes permissions: + +- `pods`: get, list, watch (to discover party pods) +- `services`: get, list, watch (to discover services) + +These permissions are granted via the `mpc-session-coordinator-role` Role and RoleBinding. + +## Health Checks + +All services expose a `/health` endpoint on their HTTP port (8080) for: + +- Liveness probes: Detects if the service is alive +- Readiness probes: Detects if the service is ready to accept traffic + +## Monitoring Party Pool + +Check available parties: + +```bash +# View all party pods +kubectl get pods -n mpc-system -l app=mpc-server-party + +# Check party pod logs +kubectl logs -n mpc-system -l app=mpc-server-party --tail=50 + +# Check session coordinator logs for party discovery +kubectl logs -n mpc-system -l app=mpc-session-coordinator | grep "party" +``` + +## Troubleshooting + +### Session coordinator can't discover parties + +1. Check RBAC permissions: +```bash +kubectl get role,rolebinding -n mpc-system +``` + +2. Check if service account is correctly assigned: +```bash +kubectl get pod -n mpc-system -l app=mpc-session-coordinator -o yaml | grep serviceAccount +``` + +3. Check coordinator logs: +```bash +kubectl logs -n mpc-system -l app=mpc-session-coordinator +``` + +### Parties not showing as ready + +1. Check party pod status: +```bash +kubectl get pods -n mpc-system -l app=mpc-server-party +``` + +2. Check readiness probe: +```bash +kubectl describe pod -n mpc-system +``` + +3. Check party logs: +```bash +kubectl logs -n mpc-system +``` + +## Migration from Docker Compose + +Key differences from docker-compose deployment: + +1. **No hardcoded party IDs**: In docker-compose, parties had static IDs (`server-party-1`, `server-party-2`, `server-party-3`). In K8s, pod names are used as party IDs. + +2. **Dynamic scaling**: Can scale parties up/down without restarting other services. + +3. **Service discovery**: Automatic discovery via Kubernetes API instead of DNS. + +4. **DeviceInfo optional**: `DeviceInfo` is now optional in the protocol layer, aligning with international MPC standards. + +## Advanced Configuration + +### Custom party selection strategy + +The default selection strategy is "first N available parties". To implement custom strategies (e.g., load-based, geo-aware), modify the `SelectParties()` method in `services/session-coordinator/infrastructure/k8s/party_discovery.go`. + +### Party affinity + +To ensure parties run on different nodes for fault tolerance: + +```yaml +spec: + template: + spec: + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchLabels: + app: mpc-server-party + topologyKey: kubernetes.io/hostname +``` + +Add this to `server-party-deployment.yaml` under `spec.template`. diff --git a/backend/mpc-system/k8s/configmap.yaml b/backend/mpc-system/k8s/configmap.yaml new file mode 100644 index 00000000..0ecc736e --- /dev/null +++ b/backend/mpc-system/k8s/configmap.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: mpc-config + namespace: mpc-system +data: + environment: "production" + postgres_host: "postgres.mpc-system.svc.cluster.local" + redis_host: "redis.mpc-system.svc.cluster.local" + rabbitmq_host: "rabbitmq.mpc-system.svc.cluster.local" diff --git a/backend/mpc-system/k8s/namespace.yaml b/backend/mpc-system/k8s/namespace.yaml new file mode 100644 index 00000000..c1fdd7f4 --- /dev/null +++ b/backend/mpc-system/k8s/namespace.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: mpc-system + labels: + name: mpc-system + app: mpc diff --git a/backend/mpc-system/k8s/secrets-example.yaml b/backend/mpc-system/k8s/secrets-example.yaml new file mode 100644 index 00000000..6ede7ea3 --- /dev/null +++ b/backend/mpc-system/k8s/secrets-example.yaml @@ -0,0 +1,19 @@ +# IMPORTANT: This is an example file. DO NOT commit real secrets to git! +# Copy this file to secrets.yaml and fill in your actual base64-encoded values +# Generate base64 values: echo -n "your-value" | base64 + +apiVersion: v1 +kind: Secret +metadata: + name: mpc-secrets + namespace: mpc-system +type: Opaque +data: + postgres_user: bXBjX3VzZXI= # mpc_user (example) + postgres_password: Y2hhbmdlbWU= # changeme (example - REPLACE THIS!) + redis_password: "" # empty if no password + rabbitmq_user: bXBjX3VzZXI= # mpc_user (example) + rabbitmq_password: Y2hhbmdlbWU= # changeme (example - REPLACE THIS!) + jwt_secret_key: Y2hhbmdlLXRoaXMtdG8tYS1zZWN1cmUtcmFuZG9tLXN0cmluZw== # REPLACE THIS! + crypto_master_key: Y2hhbmdlLXRoaXMtdG8tYS1zZWN1cmUtcmFuZG9tLXN0cmluZw== # REPLACE THIS! + mpc_api_key: Y2hhbmdlLXRoaXMtdG8tYS1zZWN1cmUtcmFuZG9tLXN0cmluZw== # REPLACE THIS! diff --git a/backend/mpc-system/k8s/server-party-deployment.yaml b/backend/mpc-system/k8s/server-party-deployment.yaml new file mode 100644 index 00000000..29e1ae18 --- /dev/null +++ b/backend/mpc-system/k8s/server-party-deployment.yaml @@ -0,0 +1,125 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: mpc-server-party + namespace: mpc-system +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mpc-server-party + namespace: mpc-system + labels: + app: mpc-server-party + component: compute +spec: + replicas: 3 # Start with 3 parties, can scale up/down dynamically + selector: + matchLabels: + app: mpc-server-party + template: + metadata: + labels: + app: mpc-server-party + component: compute + spec: + serviceAccountName: mpc-server-party + containers: + - name: server-party + image: mpc-system/server-party:latest + imagePullPolicy: IfNotPresent + ports: + - name: grpc + containerPort: 50051 + protocol: TCP + - name: http + containerPort: 8080 + protocol: TCP + env: + - name: MPC_SERVER_GRPC_PORT + value: "50051" + - name: MPC_SERVER_HTTP_PORT + value: "8080" + - name: MPC_SERVER_ENVIRONMENT + valueFrom: + configMapKeyRef: + name: mpc-config + key: environment + - name: MPC_DATABASE_HOST + valueFrom: + configMapKeyRef: + name: mpc-config + key: postgres_host + - name: MPC_DATABASE_PORT + value: "5432" + - name: MPC_DATABASE_USER + valueFrom: + secretKeyRef: + name: mpc-secrets + key: postgres_user + - name: MPC_DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: mpc-secrets + key: postgres_password + - name: MPC_DATABASE_DBNAME + value: "mpc_system" + - name: MPC_DATABASE_SSLMODE + value: "disable" + - name: SESSION_COORDINATOR_ADDR + value: "mpc-session-coordinator:50051" + - name: MESSAGE_ROUTER_ADDR + value: "mpc-message-router:50051" + - name: MPC_CRYPTO_MASTER_KEY + valueFrom: + secretKeyRef: + name: mpc-secrets + key: crypto_master_key + - name: PARTY_ID + valueFrom: + fieldRef: + fieldPath: metadata.name # Use pod name as unique party ID + resources: + requests: + memory: "256Mi" + cpu: "250m" + limits: + memory: "512Mi" + cpu: "500m" + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 2 +--- +apiVersion: v1 +kind: Service +metadata: + name: mpc-server-party + namespace: mpc-system + labels: + app: mpc-server-party +spec: + selector: + app: mpc-server-party + clusterIP: None # Headless service for service discovery + ports: + - name: grpc + port: 50051 + targetPort: 50051 + protocol: TCP + - name: http + port: 8080 + targetPort: 8080 + protocol: TCP diff --git a/backend/mpc-system/k8s/session-coordinator-deployment.yaml b/backend/mpc-system/k8s/session-coordinator-deployment.yaml new file mode 100644 index 00000000..7f99d70e --- /dev/null +++ b/backend/mpc-system/k8s/session-coordinator-deployment.yaml @@ -0,0 +1,189 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: mpc-session-coordinator + namespace: mpc-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: mpc-session-coordinator-role + namespace: mpc-system +rules: +- apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: ["services"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: mpc-session-coordinator-rolebinding + namespace: mpc-system +subjects: +- kind: ServiceAccount + name: mpc-session-coordinator + namespace: mpc-system +roleRef: + kind: Role + name: mpc-session-coordinator-role + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mpc-session-coordinator + namespace: mpc-system + labels: + app: mpc-session-coordinator + component: core +spec: + replicas: 2 # Can scale horizontally for high availability + selector: + matchLabels: + app: mpc-session-coordinator + template: + metadata: + labels: + app: mpc-session-coordinator + component: core + spec: + serviceAccountName: mpc-session-coordinator + containers: + - name: session-coordinator + image: mpc-system/session-coordinator:latest + imagePullPolicy: IfNotPresent + ports: + - name: grpc + containerPort: 50051 + protocol: TCP + - name: http + containerPort: 8080 + protocol: TCP + env: + - name: MPC_SERVER_GRPC_PORT + value: "50051" + - name: MPC_SERVER_HTTP_PORT + value: "8080" + - name: MPC_SERVER_ENVIRONMENT + valueFrom: + configMapKeyRef: + name: mpc-config + key: environment + - name: MPC_DATABASE_HOST + valueFrom: + configMapKeyRef: + name: mpc-config + key: postgres_host + - name: MPC_DATABASE_PORT + value: "5432" + - name: MPC_DATABASE_USER + valueFrom: + secretKeyRef: + name: mpc-secrets + key: postgres_user + - name: MPC_DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: mpc-secrets + key: postgres_password + - name: MPC_DATABASE_DBNAME + value: "mpc_system" + - name: MPC_DATABASE_SSLMODE + value: "disable" + - name: MPC_REDIS_HOST + valueFrom: + configMapKeyRef: + name: mpc-config + key: redis_host + - name: MPC_REDIS_PORT + value: "6379" + - name: MPC_REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: mpc-secrets + key: redis_password + optional: true + - name: MPC_RABBITMQ_HOST + valueFrom: + configMapKeyRef: + name: mpc-config + key: rabbitmq_host + - name: MPC_RABBITMQ_PORT + value: "5672" + - name: MPC_RABBITMQ_USER + valueFrom: + secretKeyRef: + name: mpc-secrets + key: rabbitmq_user + - name: MPC_RABBITMQ_PASSWORD + valueFrom: + secretKeyRef: + name: mpc-secrets + key: rabbitmq_password + - name: MPC_JWT_SECRET_KEY + valueFrom: + secretKeyRef: + name: mpc-secrets + key: jwt_secret_key + - name: MPC_JWT_ISSUER + value: "mpc-system" + # K8s service discovery configuration + - name: K8S_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: MPC_PARTY_SERVICE_NAME + value: "mpc-server-party" + - name: MPC_PARTY_LABEL_SELECTOR + value: "app=mpc-server-party" + - name: MPC_PARTY_GRPC_PORT + value: "50051" + - name: MPC_PARTY_DISCOVERY_INTERVAL + value: "30s" + resources: + requests: + memory: "256Mi" + cpu: "250m" + limits: + memory: "512Mi" + cpu: "500m" + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 2 +--- +apiVersion: v1 +kind: Service +metadata: + name: mpc-session-coordinator + namespace: mpc-system + labels: + app: mpc-session-coordinator +spec: + selector: + app: mpc-session-coordinator + type: ClusterIP + ports: + - name: grpc + port: 50051 + targetPort: 50051 + protocol: TCP + - name: http + port: 8080 + targetPort: 8080 + protocol: TCP diff --git a/backend/mpc-system/services/session-coordinator/application/ports/output/party_pool_port.go b/backend/mpc-system/services/session-coordinator/application/ports/output/party_pool_port.go new file mode 100644 index 00000000..742790c0 --- /dev/null +++ b/backend/mpc-system/services/session-coordinator/application/ports/output/party_pool_port.go @@ -0,0 +1,17 @@ +package output + +// PartyEndpoint represents a party endpoint from the pool +type PartyEndpoint struct { + Address string + PartyID string + Ready bool +} + +// PartyPoolPort defines the interface for party pool management +type PartyPoolPort interface { + // GetAvailableParties returns all available party endpoints + GetAvailableParties() []PartyEndpoint + + // SelectParties selects n parties from the available pool + SelectParties(n int) ([]PartyEndpoint, error) +} diff --git a/backend/mpc-system/services/session-coordinator/application/use_cases/create_session.go b/backend/mpc-system/services/session-coordinator/application/use_cases/create_session.go index b92c80b7..97425e2f 100644 --- a/backend/mpc-system/services/session-coordinator/application/use_cases/create_session.go +++ b/backend/mpc-system/services/session-coordinator/application/use_cases/create_session.go @@ -19,6 +19,7 @@ type CreateSessionUseCase struct { sessionRepo repositories.SessionRepository tokenGen jwt.TokenGenerator eventPublisher output.MessageBrokerPort + partyPool output.PartyPoolPort coordinatorSvc *services.SessionCoordinatorService } @@ -27,11 +28,13 @@ func NewCreateSessionUseCase( sessionRepo repositories.SessionRepository, tokenGen jwt.TokenGenerator, eventPublisher output.MessageBrokerPort, + partyPool output.PartyPoolPort, ) *CreateSessionUseCase { return &CreateSessionUseCase{ sessionRepo: sessionRepo, tokenGen: tokenGen, eventPublisher: eventPublisher, + partyPool: partyPool, coordinatorSvc: services.NewSessionCoordinatorService(), } } @@ -79,12 +82,59 @@ func (uc *CreateSessionUseCase) Execute( // 5. Add participants and generate join tokens tokens := make(map[string]string) if len(req.Participants) == 0 { - // For dynamic joining, generate a universal join token with wildcard party ID - universalToken, err := uc.tokenGen.GenerateJoinToken(session.ID.UUID(), "*", expiresIn) - if err != nil { - return nil, err + // No participants provided - use party pool for automatic selection + if uc.partyPool != nil { + // Select parties from K8s pool based on threshold + selectedParties, err := uc.partyPool.SelectParties(threshold.N()) + if err != nil { + logger.Warn("failed to select parties from pool, falling back to dynamic join", + zap.Error(err), + zap.Int("required_parties", threshold.N())) + + // Fallback: generate universal join token for dynamic joining + universalToken, err := uc.tokenGen.GenerateJoinToken(session.ID.UUID(), "*", expiresIn) + if err != nil { + return nil, err + } + tokens["*"] = universalToken + } else { + // Add selected parties as participants + for i, party := range selectedParties { + partyID, err := value_objects.NewPartyID(party.PartyID) + if err != nil { + return nil, err + } + + // Create participant with empty DeviceInfo (server parties don't have device info) + participant, err := entities.NewParticipant(partyID, i, entities.DeviceInfo{}) + if err != nil { + return nil, err + } + + if err := session.AddParticipant(participant); err != nil { + return nil, err + } + + // Generate join token for this party + token, err := uc.tokenGen.GenerateJoinToken(session.ID.UUID(), party.PartyID, expiresIn) + if err != nil { + return nil, err + } + tokens[party.PartyID] = token + } + + logger.Info("selected parties from K8s pool", + zap.String("session_id", session.ID.String()), + zap.Int("party_count", len(selectedParties))) + } + } else { + // No party pool configured - fallback to dynamic join + universalToken, err := uc.tokenGen.GenerateJoinToken(session.ID.UUID(), "*", expiresIn) + if err != nil { + return nil, err + } + tokens["*"] = universalToken } - tokens["*"] = universalToken } else { // For pre-registered participants, generate individual tokens for i, pInfo := range req.Participants { diff --git a/backend/mpc-system/services/session-coordinator/cmd/server/main.go b/backend/mpc-system/services/session-coordinator/cmd/server/main.go index 58444850..79a8dbc7 100644 --- a/backend/mpc-system/services/session-coordinator/cmd/server/main.go +++ b/backend/mpc-system/services/session-coordinator/cmd/server/main.go @@ -30,6 +30,7 @@ import ( redisadapter "github.com/rwadurian/mpc-system/services/session-coordinator/adapters/output/redis" "github.com/rwadurian/mpc-system/services/session-coordinator/application/use_cases" "github.com/rwadurian/mpc-system/services/session-coordinator/domain/repositories" + "github.com/rwadurian/mpc-system/services/session-coordinator/infrastructure/k8s" "go.uber.org/zap" ) @@ -96,8 +97,18 @@ func main() { cfg.JWT.RefreshExpiry, ) + // Initialize K8s party discovery (optional - will fallback gracefully if not in K8s) + partyPool, err := k8s.NewPartyDiscovery(logger.Log) + if err != nil { + logger.Warn("K8s party discovery not available, will use dynamic join mode", + zap.Error(err)) + partyPool = nil // Set to nil so CreateSessionUseCase can handle gracefully + } else { + logger.Info("K8s party discovery initialized successfully") + } + // Initialize use cases - createSessionUC := use_cases.NewCreateSessionUseCase(sessionRepo, jwtService, eventPublisher) + createSessionUC := use_cases.NewCreateSessionUseCase(sessionRepo, jwtService, eventPublisher, partyPool) joinSessionUC := use_cases.NewJoinSessionUseCase(sessionRepo, jwtService, eventPublisher) getSessionStatusUC := use_cases.NewGetSessionStatusUseCase(sessionRepo) reportCompletionUC := use_cases.NewReportCompletionUseCase(sessionRepo, eventPublisher) diff --git a/backend/mpc-system/services/session-coordinator/domain/entities/device_info.go b/backend/mpc-system/services/session-coordinator/domain/entities/device_info.go index 35bcaba4..ff884537 100644 --- a/backend/mpc-system/services/session-coordinator/domain/entities/device_info.go +++ b/backend/mpc-system/services/session-coordinator/domain/entities/device_info.go @@ -45,9 +45,18 @@ func (d DeviceInfo) IsRecovery() bool { } // Validate validates the device info +// DeviceInfo is now optional - empty device info is valid func (d DeviceInfo) Validate() error { - if d.DeviceType == "" { - return ErrInvalidDeviceInfo + // Allow empty DeviceInfo for server parties or anonymous participants + // Only validate if DeviceType is provided + if d.DeviceType != "" { + // If DeviceType is set, validate it's a known type + switch d.DeviceType { + case DeviceTypeAndroid, DeviceTypeIOS, DeviceTypePC, DeviceTypeServer, DeviceTypeRecovery: + return nil + default: + return ErrInvalidDeviceInfo + } } return nil } diff --git a/backend/mpc-system/services/session-coordinator/infrastructure/k8s/party_discovery.go b/backend/mpc-system/services/session-coordinator/infrastructure/k8s/party_discovery.go new file mode 100644 index 00000000..53a2fa39 --- /dev/null +++ b/backend/mpc-system/services/session-coordinator/infrastructure/k8s/party_discovery.go @@ -0,0 +1,219 @@ +package k8s + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + "github.com/rwadurian/mpc-system/services/session-coordinator/application/ports/output" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// PartyEndpoint represents a discovered party endpoint +type PartyEndpoint struct { + Address string + PodName string + Ready bool +} + +// PartyDiscovery handles Kubernetes-based party service discovery +type PartyDiscovery struct { + clientset *kubernetes.Clientset + namespace string + serviceName string + labelSelector string + logger *zap.Logger + endpoints []PartyEndpoint + mu sync.RWMutex + refreshInterval time.Duration +} + +// NewPartyDiscovery creates a new Kubernetes party discovery service +func NewPartyDiscovery(logger *zap.Logger) (*PartyDiscovery, error) { + var config *rest.Config + var err error + + // Try in-cluster config first (when running inside K8s) + config, err = rest.InClusterConfig() + if err != nil { + // Fallback to kubeconfig for local development + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + kubeconfig = os.Getenv("HOME") + "/.kube/config" + } + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create k8s config: %w", err) + } + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create k8s clientset: %w", err) + } + + namespace := os.Getenv("K8S_NAMESPACE") + if namespace == "" { + namespace = "default" + } + + serviceName := os.Getenv("MPC_PARTY_SERVICE_NAME") + if serviceName == "" { + serviceName = "mpc-server-party" + } + + labelSelector := os.Getenv("MPC_PARTY_LABEL_SELECTOR") + if labelSelector == "" { + labelSelector = "app=mpc-server-party" + } + + refreshInterval := 30 * time.Second + if interval := os.Getenv("MPC_PARTY_DISCOVERY_INTERVAL"); interval != "" { + if d, err := time.ParseDuration(interval); err == nil { + refreshInterval = d + } + } + + pd := &PartyDiscovery{ + clientset: clientset, + namespace: namespace, + serviceName: serviceName, + labelSelector: labelSelector, + logger: logger, + endpoints: []PartyEndpoint{}, + refreshInterval: refreshInterval, + } + + // Initial discovery + if err := pd.refresh(); err != nil { + logger.Warn("Initial party discovery failed, will retry", zap.Error(err)) + } + + // Start background refresh + go pd.backgroundRefresh() + + return pd, nil +} + +// GetAvailableParties returns a list of available party endpoints +// Implements output.PartyPoolPort interface +func (pd *PartyDiscovery) GetAvailableParties() []output.PartyEndpoint { + pd.mu.RLock() + defer pd.mu.RUnlock() + + // Return only ready endpoints + available := make([]output.PartyEndpoint, 0, len(pd.endpoints)) + for _, ep := range pd.endpoints { + if ep.Ready { + available = append(available, output.PartyEndpoint{ + Address: ep.Address, + PartyID: ep.PodName, // Use pod name as party ID + Ready: ep.Ready, + }) + } + } + return available +} + +// SelectParties randomly selects n parties from the available pool +// Implements output.PartyPoolPort interface +func (pd *PartyDiscovery) SelectParties(n int) ([]output.PartyEndpoint, error) { + available := pd.GetAvailableParties() + + if len(available) < n { + return nil, fmt.Errorf("insufficient parties: need %d, have %d", n, len(available)) + } + + // For now, return first n parties + // TODO: Implement random selection or load balancing strategy + selected := make([]output.PartyEndpoint, n) + copy(selected, available[:n]) + + return selected, nil +} + +// refresh updates the list of party endpoints from Kubernetes +func (pd *PartyDiscovery) refresh() error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Get pods matching the label selector + pods, err := pd.clientset.CoreV1().Pods(pd.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: pd.labelSelector, + }) + if err != nil { + return fmt.Errorf("failed to list pods: %w", err) + } + + endpoints := make([]PartyEndpoint, 0, len(pods.Items)) + for _, pod := range pods.Items { + // Check if pod is ready + ready := false + for _, condition := range pod.Status.Conditions { + if condition.Type == "Ready" && condition.Status == "True" { + ready = true + break + } + } + + // Get pod IP + if pod.Status.PodIP != "" { + // Assuming gRPC port is 50051 (should be configurable) + grpcPort := os.Getenv("MPC_PARTY_GRPC_PORT") + if grpcPort == "" { + grpcPort = "50051" + } + + endpoints = append(endpoints, PartyEndpoint{ + Address: fmt.Sprintf("%s:%s", pod.Status.PodIP, grpcPort), + PodName: pod.Name, + Ready: ready, + }) + } + } + + pd.mu.Lock() + pd.endpoints = endpoints + pd.mu.Unlock() + + pd.logger.Info("Party endpoints refreshed", + zap.Int("total", len(endpoints)), + zap.Int("ready", pd.countReady(endpoints))) + + return nil +} + +// backgroundRefresh periodically refreshes the party endpoints +func (pd *PartyDiscovery) backgroundRefresh() { + ticker := time.NewTicker(pd.refreshInterval) + defer ticker.Stop() + + for range ticker.C { + if err := pd.refresh(); err != nil { + pd.logger.Error("Failed to refresh party endpoints", zap.Error(err)) + } + } +} + +// countReady counts the number of ready endpoints +func (pd *PartyDiscovery) countReady(endpoints []PartyEndpoint) int { + count := 0 + for _, ep := range endpoints { + if ep.Ready { + count++ + } + } + return count +} + +// Close stops the background refresh +func (pd *PartyDiscovery) Close() { + // Ticker will be stopped when the goroutine exits + pd.logger.Info("Party discovery service closed") +}