diff --git a/packages/services/agent-service/src/agent.module.ts b/packages/services/agent-service/src/agent.module.ts index b93ef1e..2a53982 100644 --- a/packages/services/agent-service/src/agent.module.ts +++ b/packages/services/agent-service/src/agent.module.ts @@ -47,6 +47,7 @@ import { VoiceSessionController } from './interfaces/rest/controllers/voice-sess import { ConversationContextService } from './domain/services/conversation-context.service'; import { VoiceSessionManager } from './domain/services/voice-session-manager.service'; import { EventPublisherService } from './infrastructure/messaging/event-publisher.service'; +import { OpenAISttService } from './infrastructure/stt/openai-stt.service'; @Module({ imports: [ @@ -90,6 +91,7 @@ import { EventPublisherService } from './infrastructure/messaging/event-publishe AgentSkillService, HookScriptService, EventPublisherService, + OpenAISttService, ], }) export class AgentModule {} diff --git a/packages/services/agent-service/src/infrastructure/stt/openai-stt.service.ts b/packages/services/agent-service/src/infrastructure/stt/openai-stt.service.ts new file mode 100644 index 0000000..5d7f8c7 --- /dev/null +++ b/packages/services/agent-service/src/infrastructure/stt/openai-stt.service.ts @@ -0,0 +1,92 @@ +/** + * OpenAISttService + * + * Calls the OpenAI Whisper transcriptions API to convert audio buffers to text. + * Supports the self-signed proxy at 67.223.119.33:8443 by disabling TLS verification + * when OPENAI_BASE_URL points to an https host (same pattern as voice-agent). + */ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import * as https from 'https'; + +@Injectable() +export class OpenAISttService { + private readonly logger = new Logger(OpenAISttService.name); + private readonly apiKey: string; + private readonly baseUrl: string; + + constructor(private readonly configService: ConfigService) { + this.apiKey = this.configService.get('OPENAI_API_KEY', ''); + this.baseUrl = this.configService.get( + 'OPENAI_BASE_URL', + 'https://api.openai.com', + ).replace(/\/$/, ''); + } + + /** + * Transcribe an audio buffer to text using OpenAI Whisper. + * + * @param audioBuffer Raw audio bytes (any format Whisper accepts: m4a, webm, mp3, wav, etc.) + * @param filename Original filename including extension — Whisper uses this to detect format + * @param language BCP-47 language code hint (default: 'zh') + */ + async transcribe( + audioBuffer: Buffer, + filename: string, + language = 'zh', + ): Promise { + const url = `${this.baseUrl}/v1/audio/transcriptions`; + this.logger.log(`STT: transcribing ${filename} (${audioBuffer.length} bytes) → ${url}`); + + // Build multipart/form-data manually to avoid external dependencies + const boundary = `----FormBoundary${Date.now().toString(16)}`; + const parts: Buffer[] = []; + + const appendField = (name: string, value: string) => { + parts.push( + Buffer.from( + `--${boundary}\r\nContent-Disposition: form-data; name="${name}"\r\n\r\n${value}\r\n`, + ), + ); + }; + + appendField('model', 'whisper-1'); + appendField('language', language); + appendField('response_format', 'json'); + + // File field + parts.push( + Buffer.from( + `--${boundary}\r\nContent-Disposition: form-data; name="file"; filename="${filename}"\r\nContent-Type: application/octet-stream\r\n\r\n`, + ), + ); + parts.push(audioBuffer); + parts.push(Buffer.from(`\r\n--${boundary}--\r\n`)); + + const body = Buffer.concat(parts); + + // Use a custom https agent to allow self-signed certs (proxy at 67.223.119.33:8443) + const agent = new https.Agent({ rejectUnauthorized: false }); + + const response = await fetch(url, { + method: 'POST', + headers: { + Authorization: `Bearer ${this.apiKey}`, + 'Content-Type': `multipart/form-data; boundary=${boundary}`, + 'Content-Length': String(body.length), + }, + body, + // @ts-ignore — undici (Node 18+ native fetch) accepts dispatcher via agent option + agent, + }); + + if (!response.ok) { + const errText = await response.text().catch(() => ''); + throw new Error(`Whisper STT failed: HTTP ${response.status} — ${errText}`); + } + + const result = (await response.json()) as { text: string }; + this.logger.log(`STT result: "${result.text?.slice(0, 80)}"`); + return result.text ?? ''; + } +} diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts index e6bf31f..7856c63 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts @@ -1,5 +1,8 @@ -import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, ForbiddenException, Logger } from '@nestjs/common'; +import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, ForbiddenException, Logger, UseInterceptors, UploadedFile } from '@nestjs/common'; +import { FileInterceptor } from '@nestjs/platform-express'; +import { memoryStorage } from 'multer'; import { TenantId, EventPatterns } from '@it0/common'; +import { OpenAISttService } from '../../../infrastructure/stt/openai-stt.service'; import { EngineRegistry } from '../../../infrastructure/engines/engine-registry'; import { AgentStreamGateway } from '../../ws/agent-stream.gateway'; import { SessionRepository } from '../../../infrastructure/repositories/session.repository'; @@ -30,6 +33,7 @@ export class AgentController { private readonly usageRecordRepository: UsageRecordRepository, private readonly contextService: ConversationContextService, private readonly eventPublisher: EventPublisherService, + private readonly sttService: OpenAISttService, ) {} @Post('tasks') @@ -399,6 +403,111 @@ export class AgentController { } } + /** + * Voice message endpoint — WhatsApp-style push-to-talk. + * + * Accepts a recorded audio file, transcribes it via Whisper, then: + * • If the session has a running task → hard-interrupt it and inject the transcript + * • Otherwise → start a fresh task with the transcript + * + * This naturally solves the speaker-diarization problem: whoever presses + * record owns the message. It also doubles as an async voice interrupt: + * the user can send audio commands into any active agent workflow. + * + * POST /api/v1/agent/sessions/:sessionId/voice-message + * Content-Type: multipart/form-data + * Fields: audio (file), language? (string, default 'zh') + * + * Response: { sessionId, taskId, transcript } + */ + @Post('sessions/:sessionId/voice-message') + @UseInterceptors(FileInterceptor('audio', { storage: memoryStorage() })) + async sendVoiceMessage( + @TenantId() tenantId: string, + @Param('sessionId') sessionId: string, + @UploadedFile() file: { buffer: Buffer; originalname: string; mimetype: string } | undefined, + @Body('language') language?: string, + ) { + if (!file?.buffer?.length) { + throw new BadRequestException('audio file is required'); + } + + const session = await this.sessionRepository.findById(sessionId); + if (!session || session.tenantId !== tenantId) { + throw new NotFoundException(`Session ${sessionId} not found`); + } + + // STT: transcribe audio → text + const transcript = await this.sttService.transcribe( + file.buffer, + file.originalname || 'audio.m4a', + language ?? 'zh', + ); + + if (!transcript?.trim()) { + throw new BadRequestException('Could not transcribe audio — empty result'); + } + + const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType); + + // Hard-interrupt any running task so the voice command takes effect immediately + const runningTask = await this.taskRepository.findRunningBySessionId(sessionId); + if (runningTask) { + this.logger.log(`[VoiceMsg ${sessionId}] Interrupting running task ${runningTask.id}`); + this.captureSdkSessionId(engine, session, runningTask.id); + await engine.cancelTask(session.id).catch(() => {}); + runningTask.status = TaskStatus.CANCELLED; + runningTask.completedAt = new Date(); + await this.taskRepository.save(runningTask); + await this.awaitTaskCleanup(runningTask.id); + this.gateway.emitStreamEvent(session.id, { + type: 'cancelled', + message: 'Interrupted by voice message', + code: 'VOICE_INJECT', + }); + } + + // Save user message and build context + await this.contextService.saveUserMessage(session.id, transcript); + const conversationHistory = await this.contextService.loadContext(session.id, 20); + const historyForEngine = conversationHistory.slice(0, -1); + + const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK; + const resumeSessionId = isSdkEngine + ? (session.metadata as any)?.sdkSessionId as string | undefined + : undefined; + + // Create task record + const task = new AgentTask(); + task.id = crypto.randomUUID(); + task.tenantId = tenantId; + task.sessionId = session.id; + task.prompt = transcript; + task.status = TaskStatus.RUNNING; + task.startedAt = new Date(); + task.createdAt = new Date(); + await this.taskRepository.save(task); + + session.status = 'active'; + session.updatedAt = new Date(); + await this.sessionRepository.save(session); + + // Notify WS subscribers of the new task + this.gateway.emitStreamEvent(session.id, { type: 'task_info', taskId: task.id }); + + // Fire-and-forget stream + this.runTaskStream(engine, session, task, { + prompt: transcript, + systemPrompt: session.systemPrompt || '', + allowedTools: [], + maxTurns: 10, + conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined, + resumeSessionId, + }); + + return { sessionId: session.id, taskId: task.id, transcript }; + } + @Get('engines') async listEngines() { const engines = this.engineRegistry.listAvailable();