feat(agent-service): add voice message endpoint with Whisper STT and async interrupt

New endpoint: POST /api/v1/agent/sessions/:sessionId/voice-message
- Accepts multipart/form-data audio file (any format Whisper supports)
- Transcribes via OpenAI Whisper API (routed through existing proxy)
- If a task is currently running in the session → hard-interrupts it first
  (same cancel+inject pattern as text inject, triggered by voice command)
- Otherwise → starts a fresh task with the transcript
- Returns { sessionId, taskId, transcript } so client can subscribe to WS stream

This enables WhatsApp-style push-to-talk and doubles as an async voice
interrupt into any active agent workflow, bypassing the need for speaker
diarization (whoever presses record owns the message).

New files:
  infrastructure/stt/openai-stt.service.ts — OpenAI Whisper client,
  manually builds multipart/form-data, supports self-signed proxy cert

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-06 03:12:03 -08:00
parent d097c64c81
commit a2af76bcd7
3 changed files with 204 additions and 1 deletions

View File

@ -47,6 +47,7 @@ import { VoiceSessionController } from './interfaces/rest/controllers/voice-sess
import { ConversationContextService } from './domain/services/conversation-context.service';
import { VoiceSessionManager } from './domain/services/voice-session-manager.service';
import { EventPublisherService } from './infrastructure/messaging/event-publisher.service';
import { OpenAISttService } from './infrastructure/stt/openai-stt.service';
@Module({
imports: [
@ -90,6 +91,7 @@ import { EventPublisherService } from './infrastructure/messaging/event-publishe
AgentSkillService,
HookScriptService,
EventPublisherService,
OpenAISttService,
],
})
export class AgentModule {}

View File

@ -0,0 +1,92 @@
/**
* OpenAISttService
*
* Calls the OpenAI Whisper transcriptions API to convert audio buffers to text.
* Supports the self-signed proxy at 67.223.119.33:8443 by disabling TLS verification
* when OPENAI_BASE_URL points to an https host (same pattern as voice-agent).
*/
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import * as https from 'https';
@Injectable()
export class OpenAISttService {
private readonly logger = new Logger(OpenAISttService.name);
private readonly apiKey: string;
private readonly baseUrl: string;
constructor(private readonly configService: ConfigService) {
this.apiKey = this.configService.get<string>('OPENAI_API_KEY', '');
this.baseUrl = this.configService.get<string>(
'OPENAI_BASE_URL',
'https://api.openai.com',
).replace(/\/$/, '');
}
/**
* Transcribe an audio buffer to text using OpenAI Whisper.
*
* @param audioBuffer Raw audio bytes (any format Whisper accepts: m4a, webm, mp3, wav, etc.)
* @param filename Original filename including extension Whisper uses this to detect format
* @param language BCP-47 language code hint (default: 'zh')
*/
async transcribe(
audioBuffer: Buffer,
filename: string,
language = 'zh',
): Promise<string> {
const url = `${this.baseUrl}/v1/audio/transcriptions`;
this.logger.log(`STT: transcribing ${filename} (${audioBuffer.length} bytes) → ${url}`);
// Build multipart/form-data manually to avoid external dependencies
const boundary = `----FormBoundary${Date.now().toString(16)}`;
const parts: Buffer[] = [];
const appendField = (name: string, value: string) => {
parts.push(
Buffer.from(
`--${boundary}\r\nContent-Disposition: form-data; name="${name}"\r\n\r\n${value}\r\n`,
),
);
};
appendField('model', 'whisper-1');
appendField('language', language);
appendField('response_format', 'json');
// File field
parts.push(
Buffer.from(
`--${boundary}\r\nContent-Disposition: form-data; name="file"; filename="${filename}"\r\nContent-Type: application/octet-stream\r\n\r\n`,
),
);
parts.push(audioBuffer);
parts.push(Buffer.from(`\r\n--${boundary}--\r\n`));
const body = Buffer.concat(parts);
// Use a custom https agent to allow self-signed certs (proxy at 67.223.119.33:8443)
const agent = new https.Agent({ rejectUnauthorized: false });
const response = await fetch(url, {
method: 'POST',
headers: {
Authorization: `Bearer ${this.apiKey}`,
'Content-Type': `multipart/form-data; boundary=${boundary}`,
'Content-Length': String(body.length),
},
body,
// @ts-ignore — undici (Node 18+ native fetch) accepts dispatcher via agent option
agent,
});
if (!response.ok) {
const errText = await response.text().catch(() => '');
throw new Error(`Whisper STT failed: HTTP ${response.status}${errText}`);
}
const result = (await response.json()) as { text: string };
this.logger.log(`STT result: "${result.text?.slice(0, 80)}"`);
return result.text ?? '';
}
}

View File

@ -1,5 +1,8 @@
import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, ForbiddenException, Logger } from '@nestjs/common';
import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, ForbiddenException, Logger, UseInterceptors, UploadedFile } from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import { memoryStorage } from 'multer';
import { TenantId, EventPatterns } from '@it0/common';
import { OpenAISttService } from '../../../infrastructure/stt/openai-stt.service';
import { EngineRegistry } from '../../../infrastructure/engines/engine-registry';
import { AgentStreamGateway } from '../../ws/agent-stream.gateway';
import { SessionRepository } from '../../../infrastructure/repositories/session.repository';
@ -30,6 +33,7 @@ export class AgentController {
private readonly usageRecordRepository: UsageRecordRepository,
private readonly contextService: ConversationContextService,
private readonly eventPublisher: EventPublisherService,
private readonly sttService: OpenAISttService,
) {}
@Post('tasks')
@ -399,6 +403,111 @@ export class AgentController {
}
}
/**
* Voice message endpoint WhatsApp-style push-to-talk.
*
* Accepts a recorded audio file, transcribes it via Whisper, then:
* If the session has a running task hard-interrupt it and inject the transcript
* Otherwise start a fresh task with the transcript
*
* This naturally solves the speaker-diarization problem: whoever presses
* record owns the message. It also doubles as an async voice interrupt:
* the user can send audio commands into any active agent workflow.
*
* POST /api/v1/agent/sessions/:sessionId/voice-message
* Content-Type: multipart/form-data
* Fields: audio (file), language? (string, default 'zh')
*
* Response: { sessionId, taskId, transcript }
*/
@Post('sessions/:sessionId/voice-message')
@UseInterceptors(FileInterceptor('audio', { storage: memoryStorage() }))
async sendVoiceMessage(
@TenantId() tenantId: string,
@Param('sessionId') sessionId: string,
@UploadedFile() file: { buffer: Buffer; originalname: string; mimetype: string } | undefined,
@Body('language') language?: string,
) {
if (!file?.buffer?.length) {
throw new BadRequestException('audio file is required');
}
const session = await this.sessionRepository.findById(sessionId);
if (!session || session.tenantId !== tenantId) {
throw new NotFoundException(`Session ${sessionId} not found`);
}
// STT: transcribe audio → text
const transcript = await this.sttService.transcribe(
file.buffer,
file.originalname || 'audio.m4a',
language ?? 'zh',
);
if (!transcript?.trim()) {
throw new BadRequestException('Could not transcribe audio — empty result');
}
const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType);
// Hard-interrupt any running task so the voice command takes effect immediately
const runningTask = await this.taskRepository.findRunningBySessionId(sessionId);
if (runningTask) {
this.logger.log(`[VoiceMsg ${sessionId}] Interrupting running task ${runningTask.id}`);
this.captureSdkSessionId(engine, session, runningTask.id);
await engine.cancelTask(session.id).catch(() => {});
runningTask.status = TaskStatus.CANCELLED;
runningTask.completedAt = new Date();
await this.taskRepository.save(runningTask);
await this.awaitTaskCleanup(runningTask.id);
this.gateway.emitStreamEvent(session.id, {
type: 'cancelled',
message: 'Interrupted by voice message',
code: 'VOICE_INJECT',
});
}
// Save user message and build context
await this.contextService.saveUserMessage(session.id, transcript);
const conversationHistory = await this.contextService.loadContext(session.id, 20);
const historyForEngine = conversationHistory.slice(0, -1);
const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK;
const resumeSessionId = isSdkEngine
? (session.metadata as any)?.sdkSessionId as string | undefined
: undefined;
// Create task record
const task = new AgentTask();
task.id = crypto.randomUUID();
task.tenantId = tenantId;
task.sessionId = session.id;
task.prompt = transcript;
task.status = TaskStatus.RUNNING;
task.startedAt = new Date();
task.createdAt = new Date();
await this.taskRepository.save(task);
session.status = 'active';
session.updatedAt = new Date();
await this.sessionRepository.save(session);
// Notify WS subscribers of the new task
this.gateway.emitStreamEvent(session.id, { type: 'task_info', taskId: task.id });
// Fire-and-forget stream
this.runTaskStream(engine, session, task, {
prompt: transcript,
systemPrompt: session.systemPrompt || '',
allowedTools: [],
maxTurns: 10,
conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined,
resumeSessionId,
});
return { sessionId: session.id, taskId: task.id, transcript };
}
@Get('engines')
async listEngines() {
const engines = this.engineRegistry.listAvailable();