feat(agents): stream specialist agent progress to frontend

- Convert BaseSpecialistService.callClaude() from sync .create() to streaming .stream()
- Add onProgress callback to SpecialistExecutionOptions for real-time text delta reporting
- All 6 specialist convenience methods now accept optional options parameter
- Coordinator creates throttled progress callback (every 300 chars) pushing agent_progress events
- Agent loop drains accumulated progress events after each tool execution batch
- WebSocket gateway forwards agent_progress events to frontend
- Progress event sink shared between tool executor and agent loop via closure

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-08 04:17:37 -08:00
parent 198ff4b349
commit 2ebc8e6da6
10 changed files with 110 additions and 34 deletions

View File

@ -166,6 +166,13 @@ export class ConversationGateway
agentName: chunk.agentName,
description: chunk.description,
});
} else if (chunk.type === 'agent_progress') {
client.emit('agent_progress', {
messageId,
conversationId,
agentType: chunk.agentType,
message: chunk.message,
});
} else if (chunk.type === 'agent_complete') {
client.emit('agent_complete', {
messageId,

View File

@ -113,6 +113,7 @@ export async function* agentLoop(
toolExecutor: CoordinatorToolExecutor,
additionalTools?: Array<{ name: string; description: string; input_schema: Record<string, unknown> }>,
additionalConcurrencyMap?: Record<string, boolean>,
progressEventSink?: StreamEvent[],
): AsyncGenerator<StreamEvent> {
const {
messages,
@ -430,6 +431,7 @@ export async function* agentLoop(
toolExecutor,
additionalTools,
additionalConcurrencyMap,
progressEventSink,
);
return;
}
@ -492,6 +494,14 @@ export async function* agentLoop(
`${lp} [Turn ${currentTurn + 1}] Tools executed: ${summary.totalTools} tools in ${summary.parallelBatches} batch(es), ${summary.totalDurationMs}ms total`,
);
// ---- Drain accumulated progress events from specialist streaming ----
if (progressEventSink && progressEventSink.length > 0) {
const events = progressEventSink.splice(0, progressEventSink.length);
for (const evt of events) {
yield evt;
}
}
// ---- Emit Tool Results & Agent Completions ----
for (const result of toolResults) {
// Emit tool_result event
@ -547,6 +557,7 @@ export async function* agentLoop(
toolExecutor,
additionalTools,
additionalConcurrencyMap,
progressEventSink,
);
}

View File

@ -36,6 +36,7 @@ import {
AgentLoopParams,
ClaudeMessage,
SystemPromptBlock,
SpecialistAgentType,
} from '../types/agent.types';
import { StreamEvent } from '../types/stream.types';
import { ConversationContext } from '../types/context.types';
@ -302,8 +303,11 @@ export class CoordinatorAgentService implements OnModuleInit {
evaluationGate: evaluationGateCallback,
};
// 6. Create tool executor
const toolExecutor = this.createToolExecutor(context.userId, context.conversationId);
// 6. Create progress event sink (shared between tool executor and agent loop)
const progressEventSink: StreamEvent[] = [];
// 6.1. Create tool executor
const toolExecutor = this.createToolExecutor(context.userId, context.conversationId, progressEventSink);
// 6.5. Gather MCP tools (if any MCP servers are connected)
const mcpDiscoveredTools = this.mcpClient.getDiscoveredTools();
@ -325,6 +329,7 @@ export class CoordinatorAgentService implements OnModuleInit {
toolExecutor,
mcpTools,
mcpConcurrencyMap,
progressEventSink,
)) {
yield this.mapEventToStreamChunk(event);
}
@ -515,7 +520,7 @@ export class CoordinatorAgentService implements OnModuleInit {
/**
* Agent
*/
private createToolExecutor(userId: string, conversationId: string): CoordinatorToolExecutor {
private createToolExecutor(userId: string, conversationId: string, progressEventSink?: StreamEvent[]): CoordinatorToolExecutor {
return async (
toolName: string,
toolInput: Record<string, unknown>,
@ -548,7 +553,11 @@ export class CoordinatorAgentService implements OnModuleInit {
let result: { output: string; isError: boolean };
if (toolType === 'agent') {
result = await this.executeAgentTool(toolName, toolInput);
// 为 Agent 调用创建 onProgress callback将进度事件推送到 sink
const onProgress = progressEventSink
? this.createAgentProgressCallback(toolName, progressEventSink)
: undefined;
result = await this.executeAgentTool(toolName, toolInput, onProgress);
} else if (toolType === 'mcp') {
result = await this.mcpClient.executeTool(toolName, toolInput);
} else {
@ -589,7 +598,10 @@ export class CoordinatorAgentService implements OnModuleInit {
private async executeAgentTool(
toolName: string,
toolInput: Record<string, unknown>,
onProgress?: (text: string) => void,
): Promise<{ output: string; isError: boolean }> {
const opts = onProgress ? { onProgress } : undefined;
switch (toolName) {
case 'invoke_policy_expert': {
const result = await this.policyExpert.executeQuery({
@ -597,7 +609,7 @@ export class CoordinatorAgentService implements OnModuleInit {
category: toolInput.category as string | undefined,
includeProcessSteps: toolInput.includeProcessSteps as boolean | undefined,
includeRequirements: toolInput.includeRequirements as boolean | undefined,
});
}, opts);
return { output: result, isError: false };
}
@ -606,7 +618,7 @@ export class CoordinatorAgentService implements OnModuleInit {
userInfo: (toolInput.userInfo as Record<string, unknown>) || {},
targetCategories: toolInput.targetCategories as string[] | undefined,
conversationContext: toolInput.conversationContext as string | undefined,
});
}, opts);
return { output: result, isError: false };
}
@ -618,7 +630,7 @@ export class CoordinatorAgentService implements OnModuleInit {
userSentiment: toolInput.userSentiment as string | undefined,
hasAssessment: (toolInput.hasAssessment as boolean) || false,
hasConverted: (toolInput.hasConverted as boolean) || false,
});
}, opts);
return { output: result, isError: false };
}
@ -627,7 +639,7 @@ export class CoordinatorAgentService implements OnModuleInit {
objection: (toolInput.objection as string) || '',
userContext: (toolInput.userContext as string) || '',
previousObjections: toolInput.previousObjections as string[] | undefined,
});
}, opts);
return { output: result, isError: false };
}
@ -636,7 +648,7 @@ export class CoordinatorAgentService implements OnModuleInit {
userProfile: (toolInput.userProfile as Record<string, unknown>) || {},
targetCategory: (toolInput.targetCategory as string) || '',
focusArea: toolInput.focusArea as string | undefined,
});
}, opts);
return { output: result, isError: false };
}
@ -647,7 +659,7 @@ export class CoordinatorAgentService implements OnModuleInit {
dataToSave: toolInput.dataToSave as Record<string, unknown> | undefined,
recentMessages: toolInput.recentMessages as string | undefined,
contextQuery: toolInput.contextQuery as string | undefined,
});
}, opts);
return { output: result, isError: false };
}
@ -659,6 +671,42 @@ export class CoordinatorAgentService implements OnModuleInit {
}
}
/**
* Agent agent_progress sink
* 300
*/
private createAgentProgressCallback(
toolName: string,
progressEventSink: StreamEvent[],
): (text: string) => void {
const agentTypeMap: Record<string, SpecialistAgentType> = {
invoke_policy_expert: SpecialistAgentType.POLICY_EXPERT,
invoke_assessment_expert: SpecialistAgentType.ASSESSMENT_EXPERT,
invoke_strategist: SpecialistAgentType.STRATEGIST,
invoke_objection_handler: SpecialistAgentType.OBJECTION_HANDLER,
invoke_case_analyst: SpecialistAgentType.CASE_ANALYST,
invoke_memory_manager: SpecialistAgentType.MEMORY_MANAGER,
};
const agentType = agentTypeMap[toolName];
if (!agentType) return () => {};
let accumulated = '';
const THROTTLE_CHARS = 300;
return (text: string) => {
accumulated += text;
if (accumulated.length >= THROTTLE_CHARS) {
progressEventSink.push({
type: 'agent_progress',
agentType,
message: `已生成 ${accumulated.length} 字...`,
timestamp: Date.now(),
});
accumulated = '';
}
};
}
/**
* ImmigrationToolsService
*/

View File

@ -91,9 +91,9 @@ export class AssessmentExpertService extends BaseSpecialistService {
/**
* Typed execute method
*/
async executeAssessment(input: AssessmentExpertInput): Promise<string> {
async executeAssessment(input: AssessmentExpertInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise<string> {
const userMessage = this.buildUserMessage(input);
const { result } = await this.execute(userMessage);
const { result } = await this.execute(userMessage, options);
return result;
}

View File

@ -34,6 +34,8 @@ export interface SpecialistExecutionOptions {
abortSignal?: AbortSignal;
/** Trace ID for end-to-end observability */
traceId?: string;
/** Progress callback — invoked with incremental text deltas during streaming */
onProgress?: (text: string) => void;
}
export abstract class BaseSpecialistService {
@ -101,7 +103,7 @@ export abstract class BaseSpecialistService {
break;
}
// Call Claude API (non-streaming for specialists — simpler and sufficient)
// Call Claude API (streaming for progress reporting)
const response = await this.callClaude(
systemPrompt,
messages,
@ -111,6 +113,7 @@ export abstract class BaseSpecialistService {
input_schema: t.input_schema,
})) : undefined,
maxTokens,
options?.onProgress,
);
// Track tokens
@ -219,14 +222,30 @@ export abstract class BaseSpecialistService {
}
/**
* Call Claude API with timeout
* Call Claude API with streaming + timeout
* 使 API onProgress
*/
private async callClaude(
systemPrompt: string,
messages: Array<{ role: 'user' | 'assistant'; content: any }>,
tools: any[] | undefined,
maxTokens: number,
onProgress?: (text: string) => void,
): Promise<Anthropic.Message> {
const stream = this.anthropicClient.messages.stream({
model: this.config.model,
system: [{ type: 'text' as const, text: systemPrompt, cache_control: { type: 'ephemeral' as const } }],
messages,
...(tools && tools.length > 0 ? { tools } : {}),
max_tokens: maxTokens,
...(this.config.temperature !== undefined ? { temperature: this.config.temperature } : {}),
} as any);
// 将文本增量实时推送给 onProgress 回调
if (onProgress) {
stream.on('text', (text: string) => onProgress(text));
}
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(
() => reject(new Error(`Agent ${this.config.type} timed out after ${this.config.timeoutMs}ms`)),
@ -234,15 +253,6 @@ export abstract class BaseSpecialistService {
);
});
const apiCall = this.anthropicClient.messages.create({
model: this.config.model,
system: [{ type: 'text' as const, text: systemPrompt, cache_control: { type: 'ephemeral' as const } }],
messages,
...(tools && tools.length > 0 ? { tools } : {}),
max_tokens: maxTokens,
...(this.config.temperature !== undefined ? { temperature: this.config.temperature } : {}),
});
return Promise.race([apiCall, timeoutPromise]);
return Promise.race([stream.finalMessage(), timeoutPromise]);
}
}

View File

@ -93,9 +93,9 @@ export class CaseAnalystService extends BaseSpecialistService {
/**
* Typed execute method
*/
async analyzeCases(input: CaseAnalystInput): Promise<string> {
async analyzeCases(input: CaseAnalystInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise<string> {
const userMessage = this.buildUserMessage(input);
const { result } = await this.execute(userMessage);
const { result } = await this.execute(userMessage, options);
return result;
}

View File

@ -103,9 +103,9 @@ export class MemoryManagerService extends BaseSpecialistService {
/**
* Typed execute method
*/
async manageMemory(input: MemoryManagerInput): Promise<string> {
async manageMemory(input: MemoryManagerInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise<string> {
const userMessage = this.buildUserMessage(input);
const { result } = await this.execute(userMessage);
const { result } = await this.execute(userMessage, options);
return result;
}

View File

@ -91,9 +91,9 @@ export class ObjectionHandlerService extends BaseSpecialistService {
/**
* Typed execute method
*/
async handleObjection(input: ObjectionHandlerInput): Promise<string> {
async handleObjection(input: ObjectionHandlerInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise<string> {
const userMessage = this.buildUserMessage(input);
const { result } = await this.execute(userMessage);
const { result } = await this.execute(userMessage, options);
return result;
}

View File

@ -83,9 +83,9 @@ export class PolicyExpertService extends BaseSpecialistService {
/**
* Typed execute method for Coordinator to call
*/
async executeQuery(input: PolicyExpertInput): Promise<string> {
async executeQuery(input: PolicyExpertInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise<string> {
const userMessage = this.buildUserMessage(input);
const { result } = await this.execute(userMessage);
const { result } = await this.execute(userMessage, options);
return result;
}

View File

@ -63,9 +63,9 @@ export class StrategistService extends BaseSpecialistService {
/**
* Typed execute method
*/
async getStrategy(input: StrategistInput): Promise<string> {
async getStrategy(input: StrategistInput, options?: import('./base-specialist.service').SpecialistExecutionOptions): Promise<string> {
const userMessage = this.buildUserMessage(input);
const { result } = await this.execute(userMessage);
const { result } = await this.execute(userMessage, options);
return result;
}