fix: deduplicate text events from SDK stream_event and assistant message
SDK sends text both via stream_event deltas (token-level) and assistant message (complete block). Track hasStreamedText flag per session to skip duplicate text extraction from assistant messages. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
65e68a0487
commit
8e4bd573f4
|
|
@ -35,6 +35,8 @@ interface ActiveSession {
|
|||
abort: AbortController;
|
||||
gate: ApprovalGate;
|
||||
sdkSessionId?: string;
|
||||
/** Whether text was already streamed via stream_event deltas (to avoid duplicate from assistant message). */
|
||||
hasStreamedText?: boolean;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
|
|
@ -165,7 +167,7 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
|
|||
}
|
||||
}
|
||||
|
||||
const events = this.mapSdkMessage(message);
|
||||
const events = this.mapSdkMessage(message, params.sessionId);
|
||||
for (const event of events) {
|
||||
pushEvent(event);
|
||||
}
|
||||
|
|
@ -296,7 +298,7 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
|
|||
});
|
||||
|
||||
for await (const msg of sdkQuery) {
|
||||
const events = this.mapSdkMessage(msg);
|
||||
const events = this.mapSdkMessage(msg, sessionId);
|
||||
for (const event of events) {
|
||||
yield event;
|
||||
}
|
||||
|
|
@ -351,8 +353,9 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
|
|||
return CommandRiskLevel.READ_ONLY;
|
||||
}
|
||||
|
||||
private mapSdkMessage(message: any): EngineStreamEvent[] {
|
||||
private mapSdkMessage(message: any, sessionId: string): EngineStreamEvent[] {
|
||||
const events: EngineStreamEvent[] = [];
|
||||
const session = this.activeSessions.get(sessionId);
|
||||
|
||||
// Log every SDK message type for debugging
|
||||
if (message.type === 'stream_event') {
|
||||
|
|
@ -371,21 +374,26 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
|
|||
const delta = streamEvent.delta;
|
||||
if (delta?.type === 'text_delta' && delta.text) {
|
||||
events.push({ type: 'text', content: delta.text });
|
||||
if (session) session.hasStreamedText = true;
|
||||
} else if (delta?.type === 'thinking_delta' && delta.thinking) {
|
||||
events.push({ type: 'thinking', content: delta.thinking });
|
||||
if (session) session.hasStreamedText = true;
|
||||
}
|
||||
}
|
||||
// Ignore content_block_start/stop/message_start/message_stop — they are structural
|
||||
} else if (message.type === 'assistant') {
|
||||
// Complete assistant message — extract ALL content blocks.
|
||||
// Text/thinking blocks are emitted here as fallback since the SDK
|
||||
// may not always send stream_event deltas (depends on SDK version/config).
|
||||
// Complete assistant message — extract content blocks.
|
||||
// Skip text/thinking if already streamed via stream_event deltas to avoid duplication.
|
||||
const alreadyStreamed = session?.hasStreamedText ?? false;
|
||||
// Reset flag for next assistant turn
|
||||
if (session) session.hasStreamedText = false;
|
||||
|
||||
const content = message.message?.content;
|
||||
if (Array.isArray(content)) {
|
||||
for (const block of content) {
|
||||
if (block.type === 'text' && block.text) {
|
||||
if (block.type === 'text' && block.text && !alreadyStreamed) {
|
||||
events.push({ type: 'text', content: block.text });
|
||||
} else if (block.type === 'thinking' && block.thinking) {
|
||||
} else if (block.type === 'thinking' && block.thinking && !alreadyStreamed) {
|
||||
events.push({ type: 'thinking', content: block.thinking });
|
||||
} else if (block.type === 'tool_use') {
|
||||
events.push({
|
||||
|
|
|
|||
Loading…
Reference in New Issue