diff --git a/packages/services/agent-service/src/infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine.ts b/packages/services/agent-service/src/infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine.ts index eb97433..6eb7f9a 100644 --- a/packages/services/agent-service/src/infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine.ts +++ b/packages/services/agent-service/src/infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine.ts @@ -35,6 +35,8 @@ interface ActiveSession { abort: AbortController; gate: ApprovalGate; sdkSessionId?: string; + /** Whether text was already streamed via stream_event deltas (to avoid duplicate from assistant message). */ + hasStreamedText?: boolean; } @Injectable() @@ -165,7 +167,7 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort { } } - const events = this.mapSdkMessage(message); + const events = this.mapSdkMessage(message, params.sessionId); for (const event of events) { pushEvent(event); } @@ -296,7 +298,7 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort { }); for await (const msg of sdkQuery) { - const events = this.mapSdkMessage(msg); + const events = this.mapSdkMessage(msg, sessionId); for (const event of events) { yield event; } @@ -351,8 +353,9 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort { return CommandRiskLevel.READ_ONLY; } - private mapSdkMessage(message: any): EngineStreamEvent[] { + private mapSdkMessage(message: any, sessionId: string): EngineStreamEvent[] { const events: EngineStreamEvent[] = []; + const session = this.activeSessions.get(sessionId); // Log every SDK message type for debugging if (message.type === 'stream_event') { @@ -371,21 +374,26 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort { const delta = streamEvent.delta; if (delta?.type === 'text_delta' && delta.text) { events.push({ type: 'text', content: delta.text }); + if (session) session.hasStreamedText = true; } else if (delta?.type === 'thinking_delta' && delta.thinking) { events.push({ type: 'thinking', content: delta.thinking }); + if (session) session.hasStreamedText = true; } } // Ignore content_block_start/stop/message_start/message_stop — they are structural } else if (message.type === 'assistant') { - // Complete assistant message — extract ALL content blocks. - // Text/thinking blocks are emitted here as fallback since the SDK - // may not always send stream_event deltas (depends on SDK version/config). + // Complete assistant message — extract content blocks. + // Skip text/thinking if already streamed via stream_event deltas to avoid duplication. + const alreadyStreamed = session?.hasStreamedText ?? false; + // Reset flag for next assistant turn + if (session) session.hasStreamedText = false; + const content = message.message?.content; if (Array.isArray(content)) { for (const block of content) { - if (block.type === 'text' && block.text) { + if (block.type === 'text' && block.text && !alreadyStreamed) { events.push({ type: 'text', content: block.text }); - } else if (block.type === 'thinking' && block.thinking) { + } else if (block.type === 'thinking' && block.thinking && !alreadyStreamed) { events.push({ type: 'thinking', content: block.thinking }); } else if (block.type === 'tool_use') { events.push({