diff --git a/packages/services/conversation-service/src/adapters/inbound/conversation.gateway.ts b/packages/services/conversation-service/src/adapters/inbound/conversation.gateway.ts index 251f49c..3f92ca9 100644 --- a/packages/services/conversation-service/src/adapters/inbound/conversation.gateway.ts +++ b/packages/services/conversation-service/src/adapters/inbound/conversation.gateway.ts @@ -166,6 +166,13 @@ export class ConversationGateway agentName: chunk.agentName, description: chunk.description, }); + } else if (chunk.type === 'agent_progress') { + client.emit('agent_progress', { + messageId, + conversationId, + agentType: chunk.agentType, + message: chunk.message, + }); } else if (chunk.type === 'agent_complete') { client.emit('agent_complete', { messageId, diff --git a/packages/services/conversation-service/src/infrastructure/agents/coordinator/agent-loop.ts b/packages/services/conversation-service/src/infrastructure/agents/coordinator/agent-loop.ts index 209e9ca..06f58cd 100644 --- a/packages/services/conversation-service/src/infrastructure/agents/coordinator/agent-loop.ts +++ b/packages/services/conversation-service/src/infrastructure/agents/coordinator/agent-loop.ts @@ -113,6 +113,7 @@ export async function* agentLoop( toolExecutor: CoordinatorToolExecutor, additionalTools?: Array<{ name: string; description: string; input_schema: Record }>, additionalConcurrencyMap?: Record, + progressEventSink?: StreamEvent[], ): AsyncGenerator { const { messages, @@ -430,6 +431,7 @@ export async function* agentLoop( toolExecutor, additionalTools, additionalConcurrencyMap, + progressEventSink, ); return; } @@ -492,6 +494,14 @@ export async function* agentLoop( `${lp} [Turn ${currentTurn + 1}] Tools executed: ${summary.totalTools} tools in ${summary.parallelBatches} batch(es), ${summary.totalDurationMs}ms total`, ); + // ---- Drain accumulated progress events from specialist streaming ---- + if (progressEventSink && progressEventSink.length > 0) { + const events = progressEventSink.splice(0, progressEventSink.length); + for (const evt of events) { + yield evt; + } + } + // ---- Emit Tool Results & Agent Completions ---- for (const result of toolResults) { // Emit tool_result event @@ -547,6 +557,7 @@ export async function* agentLoop( toolExecutor, additionalTools, additionalConcurrencyMap, + progressEventSink, ); } diff --git a/packages/services/conversation-service/src/infrastructure/agents/coordinator/coordinator-agent.service.ts b/packages/services/conversation-service/src/infrastructure/agents/coordinator/coordinator-agent.service.ts index a909f03..724c357 100644 --- a/packages/services/conversation-service/src/infrastructure/agents/coordinator/coordinator-agent.service.ts +++ b/packages/services/conversation-service/src/infrastructure/agents/coordinator/coordinator-agent.service.ts @@ -36,6 +36,7 @@ import { AgentLoopParams, ClaudeMessage, SystemPromptBlock, + SpecialistAgentType, } from '../types/agent.types'; import { StreamEvent } from '../types/stream.types'; import { ConversationContext } from '../types/context.types'; @@ -302,8 +303,11 @@ export class CoordinatorAgentService implements OnModuleInit { evaluationGate: evaluationGateCallback, }; - // 6. Create tool executor - const toolExecutor = this.createToolExecutor(context.userId, context.conversationId); + // 6. Create progress event sink (shared between tool executor and agent loop) + const progressEventSink: StreamEvent[] = []; + + // 6.1. Create tool executor + const toolExecutor = this.createToolExecutor(context.userId, context.conversationId, progressEventSink); // 6.5. Gather MCP tools (if any MCP servers are connected) const mcpDiscoveredTools = this.mcpClient.getDiscoveredTools(); @@ -325,6 +329,7 @@ export class CoordinatorAgentService implements OnModuleInit { toolExecutor, mcpTools, mcpConcurrencyMap, + progressEventSink, )) { yield this.mapEventToStreamChunk(event); } @@ -515,7 +520,7 @@ export class CoordinatorAgentService implements OnModuleInit { /** * 创建工具执行器 — 将工具调用分派给专家 Agent 或直接执行 */ - private createToolExecutor(userId: string, conversationId: string): CoordinatorToolExecutor { + private createToolExecutor(userId: string, conversationId: string, progressEventSink?: StreamEvent[]): CoordinatorToolExecutor { return async ( toolName: string, toolInput: Record, @@ -548,7 +553,11 @@ export class CoordinatorAgentService implements OnModuleInit { let result: { output: string; isError: boolean }; if (toolType === 'agent') { - result = await this.executeAgentTool(toolName, toolInput); + // 为 Agent 调用创建 onProgress callback,将进度事件推送到 sink + const onProgress = progressEventSink + ? this.createAgentProgressCallback(toolName, progressEventSink) + : undefined; + result = await this.executeAgentTool(toolName, toolInput, onProgress); } else if (toolType === 'mcp') { result = await this.mcpClient.executeTool(toolName, toolInput); } else { @@ -589,7 +598,10 @@ export class CoordinatorAgentService implements OnModuleInit { private async executeAgentTool( toolName: string, toolInput: Record, + onProgress?: (text: string) => void, ): Promise<{ output: string; isError: boolean }> { + const opts = onProgress ? { onProgress } : undefined; + switch (toolName) { case 'invoke_policy_expert': { const result = await this.policyExpert.executeQuery({ @@ -597,7 +609,7 @@ export class CoordinatorAgentService implements OnModuleInit { category: toolInput.category as string | undefined, includeProcessSteps: toolInput.includeProcessSteps as boolean | undefined, includeRequirements: toolInput.includeRequirements as boolean | undefined, - }); + }, opts); return { output: result, isError: false }; } @@ -606,7 +618,7 @@ export class CoordinatorAgentService implements OnModuleInit { userInfo: (toolInput.userInfo as Record) || {}, targetCategories: toolInput.targetCategories as string[] | undefined, conversationContext: toolInput.conversationContext as string | undefined, - }); + }, opts); return { output: result, isError: false }; } @@ -618,7 +630,7 @@ export class CoordinatorAgentService implements OnModuleInit { userSentiment: toolInput.userSentiment as string | undefined, hasAssessment: (toolInput.hasAssessment as boolean) || false, hasConverted: (toolInput.hasConverted as boolean) || false, - }); + }, opts); return { output: result, isError: false }; } @@ -627,7 +639,7 @@ export class CoordinatorAgentService implements OnModuleInit { objection: (toolInput.objection as string) || '', userContext: (toolInput.userContext as string) || '', previousObjections: toolInput.previousObjections as string[] | undefined, - }); + }, opts); return { output: result, isError: false }; } @@ -636,7 +648,7 @@ export class CoordinatorAgentService implements OnModuleInit { userProfile: (toolInput.userProfile as Record) || {}, targetCategory: (toolInput.targetCategory as string) || '', focusArea: toolInput.focusArea as string | undefined, - }); + }, opts); return { output: result, isError: false }; } @@ -647,7 +659,7 @@ export class CoordinatorAgentService implements OnModuleInit { dataToSave: toolInput.dataToSave as Record | undefined, recentMessages: toolInput.recentMessages as string | undefined, contextQuery: toolInput.contextQuery as string | undefined, - }); + }, opts); return { output: result, isError: false }; } @@ -659,6 +671,42 @@ export class CoordinatorAgentService implements OnModuleInit { } } + /** + * 创建 Agent 进度回调 — 节流推送 agent_progress 事件到 sink + * 每累积 300 字符生成一个进度事件,避免事件过多 + */ + private createAgentProgressCallback( + toolName: string, + progressEventSink: StreamEvent[], + ): (text: string) => void { + const agentTypeMap: Record = { + invoke_policy_expert: SpecialistAgentType.POLICY_EXPERT, + invoke_assessment_expert: SpecialistAgentType.ASSESSMENT_EXPERT, + invoke_strategist: SpecialistAgentType.STRATEGIST, + invoke_objection_handler: SpecialistAgentType.OBJECTION_HANDLER, + invoke_case_analyst: SpecialistAgentType.CASE_ANALYST, + invoke_memory_manager: SpecialistAgentType.MEMORY_MANAGER, + }; + const agentType = agentTypeMap[toolName]; + if (!agentType) return () => {}; + + let accumulated = ''; + const THROTTLE_CHARS = 300; + + return (text: string) => { + accumulated += text; + if (accumulated.length >= THROTTLE_CHARS) { + progressEventSink.push({ + type: 'agent_progress', + agentType, + message: `已生成 ${accumulated.length} 字...`, + timestamp: Date.now(), + }); + accumulated = ''; + } + }; + } + /** * 执行直接工具 — 委托给经过生产测试的 ImmigrationToolsService */ diff --git a/packages/services/conversation-service/src/infrastructure/agents/specialists/assessment-expert.service.ts b/packages/services/conversation-service/src/infrastructure/agents/specialists/assessment-expert.service.ts index 96d5156..8230c57 100644 --- a/packages/services/conversation-service/src/infrastructure/agents/specialists/assessment-expert.service.ts +++ b/packages/services/conversation-service/src/infrastructure/agents/specialists/assessment-expert.service.ts @@ -91,9 +91,9 @@ export class AssessmentExpertService extends BaseSpecialistService { /** * Typed execute method */ - async executeAssessment(input: AssessmentExpertInput): Promise { + async executeAssessment(input: AssessmentExpertInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise { const userMessage = this.buildUserMessage(input); - const { result } = await this.execute(userMessage); + const { result } = await this.execute(userMessage, options); return result; } diff --git a/packages/services/conversation-service/src/infrastructure/agents/specialists/base-specialist.service.ts b/packages/services/conversation-service/src/infrastructure/agents/specialists/base-specialist.service.ts index 06479fe..30df2cf 100644 --- a/packages/services/conversation-service/src/infrastructure/agents/specialists/base-specialist.service.ts +++ b/packages/services/conversation-service/src/infrastructure/agents/specialists/base-specialist.service.ts @@ -34,6 +34,8 @@ export interface SpecialistExecutionOptions { abortSignal?: AbortSignal; /** Trace ID for end-to-end observability */ traceId?: string; + /** Progress callback — invoked with incremental text deltas during streaming */ + onProgress?: (text: string) => void; } export abstract class BaseSpecialistService { @@ -101,7 +103,7 @@ export abstract class BaseSpecialistService { break; } - // Call Claude API (non-streaming for specialists — simpler and sufficient) + // Call Claude API (streaming for progress reporting) const response = await this.callClaude( systemPrompt, messages, @@ -111,6 +113,7 @@ export abstract class BaseSpecialistService { input_schema: t.input_schema, })) : undefined, maxTokens, + options?.onProgress, ); // Track tokens @@ -219,14 +222,30 @@ export abstract class BaseSpecialistService { } /** - * Call Claude API with timeout + * Call Claude API with streaming + timeout + * 使用流式 API 以支持 onProgress 回调,在文本生成过程中实时上报进度 */ private async callClaude( systemPrompt: string, messages: Array<{ role: 'user' | 'assistant'; content: any }>, tools: any[] | undefined, maxTokens: number, + onProgress?: (text: string) => void, ): Promise { + const stream = this.anthropicClient.messages.stream({ + model: this.config.model, + system: [{ type: 'text' as const, text: systemPrompt, cache_control: { type: 'ephemeral' as const } }], + messages, + ...(tools && tools.length > 0 ? { tools } : {}), + max_tokens: maxTokens, + ...(this.config.temperature !== undefined ? { temperature: this.config.temperature } : {}), + } as any); + + // 将文本增量实时推送给 onProgress 回调 + if (onProgress) { + stream.on('text', (text: string) => onProgress(text)); + } + const timeoutPromise = new Promise((_, reject) => { setTimeout( () => reject(new Error(`Agent ${this.config.type} timed out after ${this.config.timeoutMs}ms`)), @@ -234,15 +253,6 @@ export abstract class BaseSpecialistService { ); }); - const apiCall = this.anthropicClient.messages.create({ - model: this.config.model, - system: [{ type: 'text' as const, text: systemPrompt, cache_control: { type: 'ephemeral' as const } }], - messages, - ...(tools && tools.length > 0 ? { tools } : {}), - max_tokens: maxTokens, - ...(this.config.temperature !== undefined ? { temperature: this.config.temperature } : {}), - }); - - return Promise.race([apiCall, timeoutPromise]); + return Promise.race([stream.finalMessage(), timeoutPromise]); } } diff --git a/packages/services/conversation-service/src/infrastructure/agents/specialists/case-analyst.service.ts b/packages/services/conversation-service/src/infrastructure/agents/specialists/case-analyst.service.ts index ca41e26..4a63317 100644 --- a/packages/services/conversation-service/src/infrastructure/agents/specialists/case-analyst.service.ts +++ b/packages/services/conversation-service/src/infrastructure/agents/specialists/case-analyst.service.ts @@ -93,9 +93,9 @@ export class CaseAnalystService extends BaseSpecialistService { /** * Typed execute method */ - async analyzeCases(input: CaseAnalystInput): Promise { + async analyzeCases(input: CaseAnalystInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise { const userMessage = this.buildUserMessage(input); - const { result } = await this.execute(userMessage); + const { result } = await this.execute(userMessage, options); return result; } diff --git a/packages/services/conversation-service/src/infrastructure/agents/specialists/memory-manager.service.ts b/packages/services/conversation-service/src/infrastructure/agents/specialists/memory-manager.service.ts index e0c3214..6d70890 100644 --- a/packages/services/conversation-service/src/infrastructure/agents/specialists/memory-manager.service.ts +++ b/packages/services/conversation-service/src/infrastructure/agents/specialists/memory-manager.service.ts @@ -103,9 +103,9 @@ export class MemoryManagerService extends BaseSpecialistService { /** * Typed execute method */ - async manageMemory(input: MemoryManagerInput): Promise { + async manageMemory(input: MemoryManagerInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise { const userMessage = this.buildUserMessage(input); - const { result } = await this.execute(userMessage); + const { result } = await this.execute(userMessage, options); return result; } diff --git a/packages/services/conversation-service/src/infrastructure/agents/specialists/objection-handler.service.ts b/packages/services/conversation-service/src/infrastructure/agents/specialists/objection-handler.service.ts index bb6af87..a8d17a6 100644 --- a/packages/services/conversation-service/src/infrastructure/agents/specialists/objection-handler.service.ts +++ b/packages/services/conversation-service/src/infrastructure/agents/specialists/objection-handler.service.ts @@ -91,9 +91,9 @@ export class ObjectionHandlerService extends BaseSpecialistService { /** * Typed execute method */ - async handleObjection(input: ObjectionHandlerInput): Promise { + async handleObjection(input: ObjectionHandlerInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise { const userMessage = this.buildUserMessage(input); - const { result } = await this.execute(userMessage); + const { result } = await this.execute(userMessage, options); return result; } diff --git a/packages/services/conversation-service/src/infrastructure/agents/specialists/policy-expert.service.ts b/packages/services/conversation-service/src/infrastructure/agents/specialists/policy-expert.service.ts index 16a628b..6682046 100644 --- a/packages/services/conversation-service/src/infrastructure/agents/specialists/policy-expert.service.ts +++ b/packages/services/conversation-service/src/infrastructure/agents/specialists/policy-expert.service.ts @@ -83,9 +83,9 @@ export class PolicyExpertService extends BaseSpecialistService { /** * Typed execute method for Coordinator to call */ - async executeQuery(input: PolicyExpertInput): Promise { + async executeQuery(input: PolicyExpertInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise { const userMessage = this.buildUserMessage(input); - const { result } = await this.execute(userMessage); + const { result } = await this.execute(userMessage, options); return result; } diff --git a/packages/services/conversation-service/src/infrastructure/agents/specialists/strategist.service.ts b/packages/services/conversation-service/src/infrastructure/agents/specialists/strategist.service.ts index 90c470d..ed56331 100644 --- a/packages/services/conversation-service/src/infrastructure/agents/specialists/strategist.service.ts +++ b/packages/services/conversation-service/src/infrastructure/agents/specialists/strategist.service.ts @@ -63,9 +63,9 @@ export class StrategistService extends BaseSpecialistService { /** * Typed execute method */ - async getStrategy(input: StrategistInput): Promise { + async getStrategy(input: StrategistInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise { const userMessage = this.buildUserMessage(input); - const { result } = await this.execute(userMessage); + const { result } = await this.execute(userMessage, options); return result; }