From 86d7cac631552db7e0ed444c6c0aeabe7791c2ce Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 23 Feb 2026 16:52:43 -0800 Subject: [PATCH] fix: replace Socket.IO with raw WebSocket to fix 502 on /ws/agent Socket.IO requires its own handshake protocol (EIO=4) which Kong cannot proxy as a plain WebSocket upgrade, causing 502 Bad Gateway. Switch to @nestjs/platform-ws (WsAdapter) with manual session room tracking so Flutter's IOWebSocketChannel can connect directly. Also add ws/wss protocols to Kong WebSocket routes. Co-Authored-By: Claude Opus 4.6 --- packages/gateway/config/kong.yml | 6 ++ packages/services/agent-service/package.json | 5 +- .../src/interfaces/ws/agent-stream.gateway.ts | 102 +++++++++++++++--- packages/services/agent-service/src/main.ts | 3 + 4 files changed, 102 insertions(+), 14 deletions(-) diff --git a/packages/gateway/config/kong.yml b/packages/gateway/config/kong.yml index a219503..1af5628 100644 --- a/packages/gateway/config/kong.yml +++ b/packages/gateway/config/kong.yml @@ -34,6 +34,8 @@ services: protocols: - http - https + - ws + - wss - name: agent-config-service url: http://agent-service:3002 @@ -81,6 +83,8 @@ services: protocols: - http - https + - ws + - wss - name: voice-service url: http://voice-service:3008 @@ -92,6 +96,8 @@ services: protocols: - http - https + - ws + - wss - name: voice-api paths: - /api/v1/voice diff --git a/packages/services/agent-service/package.json b/packages/services/agent-service/package.json index ec9b5de..44a0e27 100644 --- a/packages/services/agent-service/package.json +++ b/packages/services/agent-service/package.json @@ -17,8 +17,8 @@ "@nestjs/passport": "^10.0.0", "@nestjs/platform-express": "^10.3.0", "@nestjs/websockets": "^10.3.0", - "@nestjs/platform-socket.io": "^10.3.0", - "socket.io": "^4.7.0", + "@nestjs/platform-ws": "^10.3.0", + "ws": "^8.16.0", "@anthropic-ai/sdk": "^0.32.0", "@anthropic-ai/claude-agent-sdk": "^0.2.49", "typeorm": "^0.3.20", @@ -38,6 +38,7 @@ "@types/express": "^4.17.21", "@types/glob": "^8.1.0", "@types/jest": "^29.5.0", + "@types/ws": "^8.5.0", "@types/node": "^20.11.0", "jest": "^29.7.0", "ts-jest": "^29.1.0", diff --git a/packages/services/agent-service/src/interfaces/ws/agent-stream.gateway.ts b/packages/services/agent-service/src/interfaces/ws/agent-stream.gateway.ts index e53e991..c85c4b3 100644 --- a/packages/services/agent-service/src/interfaces/ws/agent-stream.gateway.ts +++ b/packages/services/agent-service/src/interfaces/ws/agent-stream.gateway.ts @@ -1,32 +1,110 @@ import { WebSocketGateway, WebSocketServer, - SubscribeMessage, OnGatewayConnection, OnGatewayDisconnect, + SubscribeMessage, + MessageBody, + ConnectedSocket, } from '@nestjs/websockets'; -import { Server, Socket } from 'socket.io'; +import { Logger } from '@nestjs/common'; +import { Server } from 'ws'; +import type { WebSocket } from 'ws'; -@WebSocketGateway({ namespace: '/ws/agent', cors: true }) +/** + * Raw WebSocket gateway for agent stream events. + * + * Uses @nestjs/platform-ws (WsAdapter) instead of Socket.IO so that + * plain WebSocket clients (Flutter, curl, etc.) can connect directly + * without the Socket.IO handshake protocol. + * + * Path: ws://:/ws/agent + */ +@WebSocketGateway({ path: '/ws/agent' }) export class AgentStreamGateway implements OnGatewayConnection, OnGatewayDisconnect { + private readonly logger = new Logger(AgentStreamGateway.name); + @WebSocketServer() server!: Server; - handleConnection(client: Socket) { - console.log(`Client connected: ${client.id}`); + /** sessionId → Set of subscribed WebSocket clients */ + private readonly sessionClients = new Map>(); + /** client → Set of sessionIds it subscribed to (for cleanup on disconnect) */ + private readonly clientSessions = new Map>(); + + handleConnection(client: WebSocket) { + this.logger.log('Client connected'); + this.clientSessions.set(client, new Set()); + + // Handle raw messages (WsAdapter delivers parsed JSON to @SubscribeMessage + // only when the payload has an "event" field; otherwise we handle manually) + client.on('message', (raw: Buffer | string) => { + try { + const msg = JSON.parse(raw.toString()); + if (msg.event === 'subscribe_session') { + this.handleSubscribeSession(client, msg.data ?? msg); + } + } catch { + // Ignore non-JSON or malformed messages + } + }); } - handleDisconnect(client: Socket) { - console.log(`Client disconnected: ${client.id}`); + handleDisconnect(client: WebSocket) { + this.logger.log('Client disconnected'); + const sessions = this.clientSessions.get(client); + if (sessions) { + for (const sid of sessions) { + const clients = this.sessionClients.get(sid); + if (clients) { + clients.delete(client); + if (clients.size === 0) { + this.sessionClients.delete(sid); + } + } + } + } + this.clientSessions.delete(client); } - @SubscribeMessage('subscribe_session') - handleSubscribeSession(client: Socket, payload: { sessionId: string }) { - client.join(`session:${payload.sessionId}`); - return { event: 'subscribed', data: { sessionId: payload.sessionId } }; + handleSubscribeSession(client: WebSocket, payload: { sessionId: string; taskId?: string }) { + const { sessionId } = payload; + if (!sessionId) return; + + // Add client to session room + if (!this.sessionClients.has(sessionId)) { + this.sessionClients.set(sessionId, new Set()); + } + this.sessionClients.get(sessionId)!.add(client); + this.clientSessions.get(client)?.add(sessionId); + + // Acknowledge subscription + this.sendTo(client, { + event: 'subscribed', + data: { sessionId, taskId: payload.taskId }, + }); + + this.logger.log(`Client subscribed to session ${sessionId}`); } + /** + * Emits a stream event to all WebSocket clients subscribed to the session. + */ emitStreamEvent(sessionId: string, event: any) { - this.server.to(`session:${sessionId}`).emit('stream_event', event); + const clients = this.sessionClients.get(sessionId); + if (!clients || clients.size === 0) return; + + const message = JSON.stringify({ event: 'stream_event', data: event }); + for (const client of clients) { + if (client.readyState === 1 /* WebSocket.OPEN */) { + client.send(message); + } + } + } + + private sendTo(client: WebSocket, data: any) { + if (client.readyState === 1) { + client.send(JSON.stringify(data)); + } } } diff --git a/packages/services/agent-service/src/main.ts b/packages/services/agent-service/src/main.ts index 5da7689..636c9a7 100644 --- a/packages/services/agent-service/src/main.ts +++ b/packages/services/agent-service/src/main.ts @@ -1,6 +1,7 @@ import { NestFactory } from '@nestjs/core'; import { Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; +import { WsAdapter } from '@nestjs/platform-ws'; import { AgentModule } from './agent.module'; const logger = new Logger('AgentService'); @@ -15,6 +16,8 @@ process.on('uncaughtException', (error) => { async function bootstrap() { const app = await NestFactory.create(AgentModule); + // Use raw WebSocket adapter instead of Socket.IO + app.useWebSocketAdapter(new WsAdapter(app)); const config = app.get(ConfigService); const port = config.get('AGENT_SERVICE_PORT', 3002); await app.listen(port);