fix: replace Socket.IO with raw WebSocket to fix 502 on /ws/agent

Socket.IO requires its own handshake protocol (EIO=4) which Kong cannot
proxy as a plain WebSocket upgrade, causing 502 Bad Gateway. Switch to
@nestjs/platform-ws (WsAdapter) with manual session room tracking so
Flutter's IOWebSocketChannel can connect directly.

Also add ws/wss protocols to Kong WebSocket routes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-23 16:52:43 -08:00
parent 5e31b15dcf
commit 86d7cac631
4 changed files with 102 additions and 14 deletions

View File

@ -34,6 +34,8 @@ services:
protocols:
- http
- https
- ws
- wss
- name: agent-config-service
url: http://agent-service:3002
@ -81,6 +83,8 @@ services:
protocols:
- http
- https
- ws
- wss
- name: voice-service
url: http://voice-service:3008
@ -92,6 +96,8 @@ services:
protocols:
- http
- https
- ws
- wss
- name: voice-api
paths:
- /api/v1/voice

View File

@ -17,8 +17,8 @@
"@nestjs/passport": "^10.0.0",
"@nestjs/platform-express": "^10.3.0",
"@nestjs/websockets": "^10.3.0",
"@nestjs/platform-socket.io": "^10.3.0",
"socket.io": "^4.7.0",
"@nestjs/platform-ws": "^10.3.0",
"ws": "^8.16.0",
"@anthropic-ai/sdk": "^0.32.0",
"@anthropic-ai/claude-agent-sdk": "^0.2.49",
"typeorm": "^0.3.20",
@ -38,6 +38,7 @@
"@types/express": "^4.17.21",
"@types/glob": "^8.1.0",
"@types/jest": "^29.5.0",
"@types/ws": "^8.5.0",
"@types/node": "^20.11.0",
"jest": "^29.7.0",
"ts-jest": "^29.1.0",

View File

@ -1,32 +1,110 @@
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage,
MessageBody,
ConnectedSocket,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { Server } from 'ws';
import type { WebSocket } from 'ws';
@WebSocketGateway({ namespace: '/ws/agent', cors: true })
/**
* Raw WebSocket gateway for agent stream events.
*
* Uses @nestjs/platform-ws (WsAdapter) instead of Socket.IO so that
* plain WebSocket clients (Flutter, curl, etc.) can connect directly
* without the Socket.IO handshake protocol.
*
* Path: ws://<host>:<port>/ws/agent
*/
@WebSocketGateway({ path: '/ws/agent' })
export class AgentStreamGateway implements OnGatewayConnection, OnGatewayDisconnect {
private readonly logger = new Logger(AgentStreamGateway.name);
@WebSocketServer()
server!: Server;
handleConnection(client: Socket) {
console.log(`Client connected: ${client.id}`);
/** sessionId → Set of subscribed WebSocket clients */
private readonly sessionClients = new Map<string, Set<WebSocket>>();
/** client → Set of sessionIds it subscribed to (for cleanup on disconnect) */
private readonly clientSessions = new Map<WebSocket, Set<string>>();
handleConnection(client: WebSocket) {
this.logger.log('Client connected');
this.clientSessions.set(client, new Set());
// Handle raw messages (WsAdapter delivers parsed JSON to @SubscribeMessage
// only when the payload has an "event" field; otherwise we handle manually)
client.on('message', (raw: Buffer | string) => {
try {
const msg = JSON.parse(raw.toString());
if (msg.event === 'subscribe_session') {
this.handleSubscribeSession(client, msg.data ?? msg);
}
} catch {
// Ignore non-JSON or malformed messages
}
});
}
handleDisconnect(client: Socket) {
console.log(`Client disconnected: ${client.id}`);
handleDisconnect(client: WebSocket) {
this.logger.log('Client disconnected');
const sessions = this.clientSessions.get(client);
if (sessions) {
for (const sid of sessions) {
const clients = this.sessionClients.get(sid);
if (clients) {
clients.delete(client);
if (clients.size === 0) {
this.sessionClients.delete(sid);
}
}
}
}
this.clientSessions.delete(client);
}
@SubscribeMessage('subscribe_session')
handleSubscribeSession(client: Socket, payload: { sessionId: string }) {
client.join(`session:${payload.sessionId}`);
return { event: 'subscribed', data: { sessionId: payload.sessionId } };
handleSubscribeSession(client: WebSocket, payload: { sessionId: string; taskId?: string }) {
const { sessionId } = payload;
if (!sessionId) return;
// Add client to session room
if (!this.sessionClients.has(sessionId)) {
this.sessionClients.set(sessionId, new Set());
}
this.sessionClients.get(sessionId)!.add(client);
this.clientSessions.get(client)?.add(sessionId);
// Acknowledge subscription
this.sendTo(client, {
event: 'subscribed',
data: { sessionId, taskId: payload.taskId },
});
this.logger.log(`Client subscribed to session ${sessionId}`);
}
/**
* Emits a stream event to all WebSocket clients subscribed to the session.
*/
emitStreamEvent(sessionId: string, event: any) {
this.server.to(`session:${sessionId}`).emit('stream_event', event);
const clients = this.sessionClients.get(sessionId);
if (!clients || clients.size === 0) return;
const message = JSON.stringify({ event: 'stream_event', data: event });
for (const client of clients) {
if (client.readyState === 1 /* WebSocket.OPEN */) {
client.send(message);
}
}
}
private sendTo(client: WebSocket, data: any) {
if (client.readyState === 1) {
client.send(JSON.stringify(data));
}
}
}

View File

@ -1,6 +1,7 @@
import { NestFactory } from '@nestjs/core';
import { Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { WsAdapter } from '@nestjs/platform-ws';
import { AgentModule } from './agent.module';
const logger = new Logger('AgentService');
@ -15,6 +16,8 @@ process.on('uncaughtException', (error) => {
async function bootstrap() {
const app = await NestFactory.create(AgentModule);
// Use raw WebSocket adapter instead of Socket.IO
app.useWebSocketAdapter(new WsAdapter(app));
const config = app.get(ConfigService);
const port = config.get<number>('AGENT_SERVICE_PORT', 3002);
await app.listen(port);