feat(agents): add Redis checkpoint for agent loop crash recovery

- New RedisClientService: optional ioredis wrapper, gracefully degrades without REDIS_URL
- New RedisModule: global NestJS module providing Redis connectivity
- AgentCheckpoint interface: captures turn, messages, cost, agents, timestamp
- Agent loop saves checkpoint after each tool execution batch (TTL=10min)
- On restart with same conversationId+requestId, loads checkpoint and resumes from saved state
- Checkpoint auto-deleted after load to prevent stale recovery
- Coordinator injects @Optional() RedisClientService, builds save/load callbacks
- Zero impact when Redis is not configured — checkpoint silently skipped

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-08 04:27:25 -08:00
parent 1a1573dda3
commit 0d488ac68b
6 changed files with 213 additions and 3 deletions

View File

@ -49,6 +49,9 @@ import { EVALUATION_RULE_REPOSITORY } from '../../domain/repositories/evaluation
// Tool Hooks // Tool Hooks
import { ToolHooksService } from './hooks/tool-hooks.service'; import { ToolHooksService } from './hooks/tool-hooks.service';
// Redis (optional — checkpoint persistence)
import { RedisModule } from '../cache/redis.module';
/** /**
* Anthropic Client Provider * Anthropic Client Provider
* Anthropic SDK Agent * Anthropic SDK Agent
@ -82,6 +85,7 @@ const AnthropicClientProvider = {
TypeOrmModule.forFeature([TokenUsageORM, EvaluationRuleORM, ConversationORM]), TypeOrmModule.forFeature([TokenUsageORM, EvaluationRuleORM, ConversationORM]),
McpModule, McpModule,
PaymentModule, PaymentModule,
RedisModule,
], ],
providers: [ providers: [
// Shared Anthropic client // Shared Anthropic client

View File

@ -125,14 +125,43 @@ export async function* agentLoop(
traceId, traceId,
} = params; } = params;
const currentTurn = params.currentTurnCount || 0; let currentTurn = params.currentTurnCount || 0;
const currentCost = params.currentCostUsd || 0; let currentCost = params.currentCostUsd || 0;
/** 日志前缀 — 包含 traceId 用于端到端追踪 */ /** 日志前缀 — 包含 traceId 用于端到端追踪 */
const lp = traceId ? `[trace:${traceId}]` : `[conv:${conversationId}]`; const lp = traceId ? `[trace:${traceId}]` : `[conv:${conversationId}]`;
let totalInputTokens = 0; let totalInputTokens = 0;
let totalOutputTokens = 0; let totalOutputTokens = 0;
const agentsUsed: SpecialistAgentType[] = []; const agentsUsed: SpecialistAgentType[] = [];
// ---- Checkpoint Recovery (if available) ----
// 首轮currentTurn=0尝试从 Redis 恢复上次崩溃前的状态
if (currentTurn === 0 && params.checkpoint) {
try {
const saved = await params.checkpoint.load();
if (saved) {
logger.log(`${lp} Recovering from checkpoint at turn ${saved.turn} (${Date.now() - saved.timestamp}ms ago)`);
// 用 checkpoint 的 messages 替代当前 messages跳到 saved turn 继续
yield* agentLoop(
anthropicClient,
{
...params,
messages: saved.messages,
currentTurnCount: saved.turn,
currentCostUsd: saved.costSoFar,
checkpoint: undefined, // 恢复后不再尝试加载,只保存
},
toolExecutor,
additionalTools,
additionalConcurrencyMap,
progressEventSink,
);
return;
}
} catch (err) {
logger.warn(`${lp} Checkpoint load failed (starting fresh): ${err}`);
}
}
// ---- Safety Checks ---- // ---- Safety Checks ----
if (currentTurn >= maxTurns) { if (currentTurn >= maxTurns) {
yield { yield {
@ -545,6 +574,22 @@ export async function* agentLoop(
{ role: 'user', content: toolResultMessages }, { role: 'user', content: toolResultMessages },
]; ];
// ---- Save Checkpoint (if configured) ----
if (params.checkpoint) {
try {
await params.checkpoint.save({
turn: currentTurn + 1,
messages: nextMessages,
costSoFar: currentCost + turnCost,
agentsUsed,
timestamp: Date.now(),
});
logger.debug(`${lp} Checkpoint saved at turn ${currentTurn + 1}`);
} catch (err) {
logger.warn(`${lp} Checkpoint save failed (non-fatal): ${err}`);
}
}
// ---- Recurse ---- // ---- Recurse ----
yield* agentLoop( yield* agentLoop(
anthropicClient, anthropicClient,

View File

@ -9,7 +9,7 @@
* 4. ConversationService * 4. ConversationService
*/ */
import { Injectable, OnModuleInit } from '@nestjs/common'; import { Injectable, OnModuleInit, Optional } from '@nestjs/common';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import Anthropic from '@anthropic-ai/sdk'; import Anthropic from '@anthropic-ai/sdk';
import { Logger } from '@nestjs/common'; import { Logger } from '@nestjs/common';
@ -37,6 +37,7 @@ import {
ClaudeMessage, ClaudeMessage,
SystemPromptBlock, SystemPromptBlock,
SpecialistAgentType, SpecialistAgentType,
CheckpointCallbacks,
} from '../types/agent.types'; } from '../types/agent.types';
import { StreamEvent } from '../types/stream.types'; import { StreamEvent } from '../types/stream.types';
import { ConversationContext } from '../types/context.types'; import { ConversationContext } from '../types/context.types';
@ -61,6 +62,9 @@ import { InputGateService } from './input-gate.service';
import { ToolHooksService } from '../hooks/tool-hooks.service'; import { ToolHooksService } from '../hooks/tool-hooks.service';
import { ToolType } from '../hooks/tool-hooks.types'; import { ToolType } from '../hooks/tool-hooks.types';
// Redis (optional checkpoint persistence)
import { RedisClientService } from '../../cache/redis-client.service';
// ============================================================ // ============================================================
// Compatibility Types (与 ClaudeAgentServiceV2 的 StreamChunk 兼容) // Compatibility Types (与 ClaudeAgentServiceV2 的 StreamChunk 兼容)
// ============================================================ // ============================================================
@ -157,6 +161,8 @@ export class CoordinatorAgentService implements OnModuleInit {
private readonly tenantContext: TenantContextService, private readonly tenantContext: TenantContextService,
// Tool hooks (PreToolUse/PostToolUse interception) // Tool hooks (PreToolUse/PostToolUse interception)
private readonly toolHooks: ToolHooksService, private readonly toolHooks: ToolHooksService,
// Redis (optional — for checkpoint persistence)
@Optional() private readonly redisClient?: RedisClientService,
) {} ) {}
onModuleInit() { onModuleInit() {
@ -287,6 +293,9 @@ export class CoordinatorAgentService implements OnModuleInit {
return gateResult; return gateResult;
}; };
// 5.5. Build checkpoint callbacks (if Redis available)
const checkpoint = this.buildCheckpointCallbacks(context.conversationId, traceId);
// 6. Build agent loop params // 6. Build agent loop params
const loopParams: AgentLoopParams = { const loopParams: AgentLoopParams = {
messages: enrichedMessages, messages: enrichedMessages,
@ -301,6 +310,7 @@ export class CoordinatorAgentService implements OnModuleInit {
currentTurnCount: 0, currentTurnCount: 0,
currentCostUsd: 0, currentCostUsd: 0,
evaluationGate: evaluationGateCallback, evaluationGate: evaluationGateCallback,
checkpoint,
}; };
// 6. Create progress event sink (shared between tool executor and agent loop) // 6. Create progress event sink (shared between tool executor and agent loop)
@ -782,6 +792,42 @@ export class CoordinatorAgentService implements OnModuleInit {
} }
} }
// ============================================================
// Checkpoint Persistence
// ============================================================
/** Checkpoint TTL: 10 minutes */
private readonly CHECKPOINT_TTL_SECONDS = 600;
/**
* checkpoint Redis undefined
*/
private buildCheckpointCallbacks(
conversationId: string,
requestId: string,
): CheckpointCallbacks | undefined {
if (!this.redisClient?.isAvailable()) return undefined;
const key = `agent:checkpoint:${conversationId}:${requestId}`;
return {
save: async (checkpoint) => {
await this.redisClient!.setex(
key,
this.CHECKPOINT_TTL_SECONDS,
JSON.stringify(checkpoint),
);
},
load: async () => {
const raw = await this.redisClient!.get(key);
if (!raw) return null;
// 加载后删除 — 避免过期 checkpoint 重复恢复
await this.redisClient!.del(key);
return JSON.parse(raw);
},
};
}
// ============================================================ // ============================================================
// Event Mapping // Event Mapping
// ============================================================ // ============================================================

View File

@ -262,6 +262,21 @@ export interface CoordinatorTurnStats {
// Coordinator Loop Parameters // Coordinator Loop Parameters
// ============================================================ // ============================================================
/** Agent Loop 状态快照 — 用于崩溃恢复 */
export interface AgentCheckpoint {
turn: number;
messages: ClaudeMessage[];
costSoFar: number;
agentsUsed: string[];
timestamp: number;
}
/** Checkpoint 回调 — save/load 由 coordinator 注入agent-loop 调用 */
export interface CheckpointCallbacks {
save: (checkpoint: AgentCheckpoint) => Promise<void>;
load: () => Promise<AgentCheckpoint | null>;
}
/** Agent Loop 参数 */ /** Agent Loop 参数 */
export interface AgentLoopParams { export interface AgentLoopParams {
messages: ClaudeMessage[]; messages: ClaudeMessage[];
@ -284,6 +299,8 @@ export interface AgentLoopParams {
turnCount: number, turnCount: number,
agentsUsed: string[], agentsUsed: string[],
) => Promise<import('../coordinator/evaluation-gate.service').GateResult>; ) => Promise<import('../coordinator/evaluation-gate.service').GateResult>;
/** 可选 checkpoint 回调 — 提供时在每轮 tool 执行后保存状态快照 */
checkpoint?: CheckpointCallbacks;
} }
/** Claude API 消息格式 */ /** Claude API 消息格式 */

View File

@ -0,0 +1,82 @@
/**
* Redis Client Service
* Redis REDIS_URL
*
* Agent Loop checkpoint
*/
import { Injectable, OnModuleInit, OnModuleDestroy, Logger, Optional } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import Redis from 'ioredis';
@Injectable()
export class RedisClientService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(RedisClientService.name);
private client: Redis | null = null;
constructor(@Optional() private readonly configService?: ConfigService) {}
async onModuleInit() {
const redisUrl = this.configService?.get<string>('REDIS_URL');
if (!redisUrl) {
this.logger.log('REDIS_URL not configured — checkpoint persistence disabled');
return;
}
try {
this.client = new Redis(redisUrl, {
maxRetriesPerRequest: 3,
lazyConnect: true,
connectTimeout: 5000,
});
await this.client.connect();
this.logger.log(`Connected to Redis: ${redisUrl.replace(/\/\/.*@/, '//*****@')}`);
} catch (error) {
this.logger.warn(`Redis connection failed (checkpoint disabled): ${error}`);
this.client = null;
}
}
async onModuleDestroy() {
if (this.client) {
await this.client.quit();
this.client = null;
}
}
/** Redis 是否可用 */
isAvailable(): boolean {
return this.client !== null && this.client.status === 'ready';
}
/** SET with TTL (seconds) */
async setex(key: string, ttlSeconds: number, value: string): Promise<void> {
if (!this.client) return;
try {
await this.client.setex(key, ttlSeconds, value);
} catch (error) {
this.logger.debug(`Redis setex failed: ${error}`);
}
}
/** GET */
async get(key: string): Promise<string | null> {
if (!this.client) return null;
try {
return await this.client.get(key);
} catch (error) {
this.logger.debug(`Redis get failed: ${error}`);
return null;
}
}
/** DEL */
async del(key: string): Promise<void> {
if (!this.client) return;
try {
await this.client.del(key);
} catch (error) {
this.logger.debug(`Redis del failed: ${error}`);
}
}
}

View File

@ -0,0 +1,16 @@
/**
* Redis Module
* Redis RedisClientService
*/
import { Module, Global } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { RedisClientService } from './redis-client.service';
@Global()
@Module({
imports: [ConfigModule],
providers: [RedisClientService],
exports: [RedisClientService],
})
export class RedisModule {}