feat(resilience): add circuit breaker for downstream services

- New CircuitBreaker class: CLOSED → OPEN → HALF_OPEN three-state model
- Zero external dependencies, ~90 lines, fail-open semantics
- KnowledgeClientService: threshold=5, cooldown=60s, protects all 9 endpoints
- PaymentClientService: threshold=3, cooldown=30s, protects all 7 endpoints
- Both services refactored to use protectedFetch() — cleaner code, fewer try-catch
- Replaces verbose per-method error handling with centralized circuit breaker
- When tripped: returns null/empty fallback instantly, no network call

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-08 04:21:30 -08:00
parent 2ebc8e6da6
commit 1a1573dda3
3 changed files with 285 additions and 318 deletions

View File

@ -0,0 +1,96 @@
/**
* Circuit Breaker
*
*
* CLOSED
* OPEN fallback failureThreshold
* HALF_OPEN 1
*
* Redis
*/
import { Logger } from '@nestjs/common';
export type CircuitState = 'CLOSED' | 'OPEN' | 'HALF_OPEN';
export interface CircuitBreakerOptions {
/** 服务名称(用于日志) */
name: string;
/** 连续失败触发熔断的阈值 */
failureThreshold: number;
/** 熔断持续时间ms过后进入 HALF_OPEN */
resetTimeoutMs: number;
}
export class CircuitBreaker {
private readonly logger: Logger;
private state: CircuitState = 'CLOSED';
private failureCount = 0;
private lastFailureTime = 0;
private readonly name: string;
private readonly failureThreshold: number;
private readonly resetTimeoutMs: number;
constructor(options: CircuitBreakerOptions) {
this.name = options.name;
this.failureThreshold = options.failureThreshold;
this.resetTimeoutMs = options.resetTimeoutMs;
this.logger = new Logger(`CircuitBreaker:${this.name}`);
}
/**
*
* @param fn
* @param fallback
*/
async execute<T>(fn: () => Promise<T>, fallback: T): Promise<T> {
// OPEN 状态:检查是否可以转为 HALF_OPEN
if (this.state === 'OPEN') {
const elapsed = Date.now() - this.lastFailureTime;
if (elapsed >= this.resetTimeoutMs) {
this.state = 'HALF_OPEN';
this.logger.log(`${this.name}: OPEN → HALF_OPEN (${elapsed}ms elapsed, probing...)`);
} else {
// 仍在熔断窗口内,直接返回 fallback
return fallback;
}
}
try {
const result = await fn();
// 成功:重置计数器
if (this.state === 'HALF_OPEN') {
this.logger.log(`${this.name}: HALF_OPEN → CLOSED (probe succeeded)`);
}
this.state = 'CLOSED';
this.failureCount = 0;
return result;
} catch (error) {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
this.logger.warn(
`${this.name}: → OPEN (${this.failureCount} consecutive failures, cooldown ${this.resetTimeoutMs}ms)`,
);
}
this.logger.debug(
`${this.name}: failure ${this.failureCount}/${this.failureThreshold}${error}`,
);
return fallback;
}
}
/** 当前状态 */
getState(): CircuitState {
return this.state;
}
/** 连续失败次数 */
getFailureCount(): number {
return this.failureCount;
}
}

View File

@ -1,5 +1,6 @@
import { Injectable, OnModuleInit } from '@nestjs/common'; import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { CircuitBreaker } from '../common/circuit-breaker';
/** /**
* RAG * RAG
@ -63,13 +64,36 @@ interface ApiResponse<T> {
*/ */
@Injectable() @Injectable()
export class KnowledgeClientService implements OnModuleInit { export class KnowledgeClientService implements OnModuleInit {
private readonly logger = new Logger(KnowledgeClientService.name);
private baseUrl: string; private baseUrl: string;
private readonly circuitBreaker: CircuitBreaker;
constructor(private configService: ConfigService) {} constructor(private configService: ConfigService) {
this.circuitBreaker = new CircuitBreaker({
name: 'knowledge-service',
failureThreshold: 5,
resetTimeoutMs: 60_000,
});
}
onModuleInit() { onModuleInit() {
this.baseUrl = this.configService.get<string>('KNOWLEDGE_SERVICE_URL') || 'http://knowledge-service:3003'; this.baseUrl = this.configService.get<string>('KNOWLEDGE_SERVICE_URL') || 'http://knowledge-service:3003';
console.log(`[KnowledgeClient] Initialized with base URL: ${this.baseUrl}`); this.logger.log(`Initialized with base URL: ${this.baseUrl}`);
}
/**
* fetch 5 60s
* null
*/
private protectedFetch(url: string, init?: RequestInit): Promise<Response | null> {
return this.circuitBreaker.execute(
async () => {
const resp = await fetch(url, init);
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
return resp;
},
null,
);
} }
/** /**
@ -82,24 +106,14 @@ export class KnowledgeClientService implements OnModuleInit {
includeMemories?: boolean; includeMemories?: boolean;
includeExperiences?: boolean; includeExperiences?: boolean;
}): Promise<RAGResult | null> { }): Promise<RAGResult | null> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/knowledge/retrieve`, {
const response = await fetch(`${this.baseUrl}/api/v1/knowledge/retrieve`, { method: 'POST',
method: 'POST', headers: { 'Content-Type': 'application/json' },
headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(params),
body: JSON.stringify(params), });
}); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<RAGResult>;
if (!response.ok) { return data.success ? data.data : null;
console.error(`[KnowledgeClient] RAG retrieve failed: ${response.status}`);
return null;
}
const data = (await response.json()) as ApiResponse<RAGResult>;
return data.success ? data.data : null;
} catch (error) {
console.error('[KnowledgeClient] RAG retrieve error:', error);
return null;
}
} }
/** /**
@ -110,48 +124,28 @@ export class KnowledgeClientService implements OnModuleInit {
userId?: string; userId?: string;
category?: string; category?: string;
}): Promise<string | null> { }): Promise<string | null> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/knowledge/retrieve/prompt`, {
const response = await fetch(`${this.baseUrl}/api/v1/knowledge/retrieve/prompt`, { method: 'POST',
method: 'POST', headers: { 'Content-Type': 'application/json' },
headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(params),
body: JSON.stringify(params), });
}); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<{ context: string }>;
if (!response.ok) { return data.success ? data.data.context : null;
console.error(`[KnowledgeClient] RAG retrieve/prompt failed: ${response.status}`);
return null;
}
const data = (await response.json()) as ApiResponse<{ context: string }>;
return data.success ? data.data.context : null;
} catch (error) {
console.error('[KnowledgeClient] RAG retrieve/prompt error:', error);
return null;
}
} }
/** /**
* *
*/ */
async checkOffTopic(query: string): Promise<OffTopicResult> { async checkOffTopic(query: string): Promise<OffTopicResult> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/knowledge/check-off-topic`, {
const response = await fetch(`${this.baseUrl}/api/v1/knowledge/check-off-topic`, { method: 'POST',
method: 'POST', headers: { 'Content-Type': 'application/json' },
headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ query }),
body: JSON.stringify({ query }), });
}); if (!resp) return { isOffTopic: false, confidence: 0 };
const data = (await resp.json()) as ApiResponse<OffTopicResult>;
if (!response.ok) { return data.success ? data.data : { isOffTopic: false, confidence: 0 };
console.error(`[KnowledgeClient] checkOffTopic failed: ${response.status}`);
return { isOffTopic: false, confidence: 0 };
}
const data = (await response.json()) as ApiResponse<OffTopicResult>;
return data.success ? data.data : { isOffTopic: false, confidence: 0 };
} catch (error) {
console.error('[KnowledgeClient] checkOffTopic error:', error);
return { isOffTopic: false, confidence: 0 };
}
} }
/** /**
@ -165,24 +159,14 @@ export class KnowledgeClientService implements OnModuleInit {
sourceConversationId?: string; sourceConversationId?: string;
relatedCategory?: string; relatedCategory?: string;
}): Promise<UserMemory | null> { }): Promise<UserMemory | null> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/memory/user`, {
const response = await fetch(`${this.baseUrl}/api/v1/memory/user`, { method: 'POST',
method: 'POST', headers: { 'Content-Type': 'application/json' },
headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(params),
body: JSON.stringify(params), });
}); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<UserMemory>;
if (!response.ok) { return data.success ? data.data : null;
console.error(`[KnowledgeClient] saveUserMemory failed: ${response.status}`);
return null;
}
const data = (await response.json()) as ApiResponse<UserMemory>;
return data.success ? data.data : null;
} catch (error) {
console.error('[KnowledgeClient] saveUserMemory error:', error);
return null;
}
} }
/** /**
@ -193,44 +177,24 @@ export class KnowledgeClientService implements OnModuleInit {
query: string; query: string;
limit?: number; limit?: number;
}): Promise<UserMemory[]> { }): Promise<UserMemory[]> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/memory/user/search`, {
const response = await fetch(`${this.baseUrl}/api/v1/memory/user/search`, { method: 'POST',
method: 'POST', headers: { 'Content-Type': 'application/json' },
headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(params),
body: JSON.stringify(params), });
}); if (!resp) return [];
const data = (await resp.json()) as ApiResponse<UserMemory[]>;
if (!response.ok) { return data.success ? data.data : [];
console.error(`[KnowledgeClient] searchUserMemories failed: ${response.status}`);
return [];
}
const data = (await response.json()) as ApiResponse<UserMemory[]>;
return data.success ? data.data : [];
} catch (error) {
console.error('[KnowledgeClient] searchUserMemories error:', error);
return [];
}
} }
/** /**
* *
*/ */
async getUserTopMemories(userId: string, limit = 5): Promise<UserMemory[]> { async getUserTopMemories(userId: string, limit = 5): Promise<UserMemory[]> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/memory/user/${userId}/top?limit=${limit}`);
const response = await fetch(`${this.baseUrl}/api/v1/memory/user/${userId}/top?limit=${limit}`); if (!resp) return [];
const data = (await resp.json()) as ApiResponse<UserMemory[]>;
if (!response.ok) { return data.success ? data.data : [];
console.error(`[KnowledgeClient] getUserTopMemories failed: ${response.status}`);
return [];
}
const data = (await response.json()) as ApiResponse<UserMemory[]>;
return data.success ? data.data : [];
} catch (error) {
console.error('[KnowledgeClient] getUserTopMemories error:', error);
return [];
}
} }
/** /**
@ -243,48 +207,31 @@ export class KnowledgeClientService implements OnModuleInit {
activeOnly?: boolean; activeOnly?: boolean;
limit?: number; limit?: number;
}): Promise<SystemExperience[]> { }): Promise<SystemExperience[]> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/memory/experience/search`, {
const response = await fetch(`${this.baseUrl}/api/v1/memory/experience/search`, { method: 'POST',
method: 'POST', headers: { 'Content-Type': 'application/json' },
headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(params),
body: JSON.stringify(params), });
}); if (!resp) return [];
const data = (await resp.json()) as ApiResponse<SystemExperience[]>;
if (!response.ok) { return data.success ? data.data : [];
console.error(`[KnowledgeClient] searchExperiences failed: ${response.status}`);
return [];
}
const data = (await response.json()) as ApiResponse<SystemExperience[]>;
return data.success ? data.data : [];
} catch (error) {
console.error('[KnowledgeClient] searchExperiences error:', error);
return [];
}
} }
/** /**
* Neo4j * Neo4j
*/ */
async initializeUser(userId: string): Promise<boolean> { async initializeUser(userId: string): Promise<boolean> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/memory/user`, {
// 通过保存一个初始记忆来初始化用户 method: 'POST',
const response = await fetch(`${this.baseUrl}/api/v1/memory/user`, { headers: { 'Content-Type': 'application/json' },
method: 'POST', body: JSON.stringify({
headers: { 'Content-Type': 'application/json' }, userId,
body: JSON.stringify({ memoryType: 'FACT',
userId, content: '用户首次访问系统',
memoryType: 'FACT', importance: 10,
content: '用户首次访问系统', }),
importance: 10, });
}), return resp !== null;
});
return response.ok;
} catch (error) {
console.error('[KnowledgeClient] initializeUser error:', error);
return false;
}
} }
/** /**
@ -299,24 +246,14 @@ export class KnowledgeClientService implements OnModuleInit {
confidence?: number; confidence?: number;
relatedCategory?: string; relatedCategory?: string;
}): Promise<SystemExperience | null> { }): Promise<SystemExperience | null> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/memory/experience`, {
const response = await fetch(`${this.baseUrl}/api/v1/memory/experience`, { method: 'POST',
method: 'POST', headers: { 'Content-Type': 'application/json' },
headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(params),
body: JSON.stringify(params), });
}); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<SystemExperience>;
if (!response.ok) { return data.success ? data.data : null;
console.error(`[KnowledgeClient] saveExperience failed: ${response.status}`);
return null;
}
const data = (await response.json()) as ApiResponse<SystemExperience>;
return data.success ? data.data : null;
} catch (error) {
console.error('[KnowledgeClient] saveExperience error:', error);
return null;
}
} }
// ============================================================ // ============================================================

View File

@ -1,5 +1,6 @@
import { Injectable, OnModuleInit } from '@nestjs/common'; import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { CircuitBreaker } from '../common/circuit-breaker';
/** /**
* *
@ -50,16 +51,36 @@ interface ApiResponse<T> {
*/ */
@Injectable() @Injectable()
export class PaymentClientService implements OnModuleInit { export class PaymentClientService implements OnModuleInit {
private readonly logger = new Logger(PaymentClientService.name);
private baseUrl: string; private baseUrl: string;
private readonly circuitBreaker: CircuitBreaker;
constructor(private configService: ConfigService) {} constructor(private configService: ConfigService) {
this.circuitBreaker = new CircuitBreaker({
name: 'payment-service',
failureThreshold: 3,
resetTimeoutMs: 30_000,
});
}
onModuleInit() { onModuleInit() {
this.baseUrl = this.baseUrl =
this.configService.get<string>('PAYMENT_SERVICE_URL') || this.configService.get<string>('PAYMENT_SERVICE_URL') ||
'http://payment-service:3002'; 'http://payment-service:3002';
console.log( this.logger.log(`Initialized with base URL: ${this.baseUrl}`);
`[PaymentClient] Initialized with base URL: ${this.baseUrl}`, }
/**
* fetch 3 30s
*/
private protectedFetch(url: string, init?: RequestInit): Promise<Response | null> {
return this.circuitBreaker.execute(
async () => {
const resp = await fetch(url, init);
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
return resp;
},
null,
); );
} }
@ -72,34 +93,21 @@ export class PaymentClientService implements OnModuleInit {
serviceCategory?: string; serviceCategory?: string;
conversationId?: string; conversationId?: string;
}): Promise<OrderInfo | null> { }): Promise<OrderInfo | null> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/orders`, {
const response = await fetch(`${this.baseUrl}/api/v1/orders`, { method: 'POST',
method: 'POST', headers: {
headers: { 'Content-Type': 'application/json',
'Content-Type': 'application/json', 'x-user-id': params.userId,
'x-user-id': params.userId, },
}, body: JSON.stringify({
body: JSON.stringify({ serviceType: params.serviceType,
serviceType: params.serviceType, serviceCategory: params.serviceCategory,
serviceCategory: params.serviceCategory, conversationId: params.conversationId,
conversationId: params.conversationId, }),
}), });
}); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<OrderInfo>;
if (!response.ok) { return data.success ? data.data : null;
const errText = await response.text();
console.error(
`[PaymentClient] createOrder failed: ${response.status} ${errText}`,
);
return null;
}
const data = (await response.json()) as ApiResponse<OrderInfo>;
return data.success ? data.data : null;
} catch (error) {
console.error('[PaymentClient] createOrder error:', error);
return null;
}
} }
/** /**
@ -109,30 +117,17 @@ export class PaymentClientService implements OnModuleInit {
orderId: string; orderId: string;
method: string; method: string;
}): Promise<PaymentResult | null> { }): Promise<PaymentResult | null> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/payments`, {
const response = await fetch(`${this.baseUrl}/api/v1/payments`, { method: 'POST',
method: 'POST', headers: { 'Content-Type': 'application/json' },
headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({
body: JSON.stringify({ orderId: params.orderId,
orderId: params.orderId, method: params.method,
method: params.method, }),
}), });
}); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<PaymentResult>;
if (!response.ok) { return data.success ? data.data : null;
const errText = await response.text();
console.error(
`[PaymentClient] createPayment failed: ${response.status} ${errText}`,
);
return null;
}
const data = (await response.json()) as ApiResponse<PaymentResult>;
return data.success ? data.data : null;
} catch (error) {
console.error('[PaymentClient] createPayment error:', error);
return null;
}
} }
/** /**
@ -141,27 +136,15 @@ export class PaymentClientService implements OnModuleInit {
async checkPaymentStatus( async checkPaymentStatus(
paymentId: string, paymentId: string,
): Promise<{ status: string; paidAt?: string } | null> { ): Promise<{ status: string; paidAt?: string } | null> {
try { const resp = await this.protectedFetch(
const response = await fetch( `${this.baseUrl}/api/v1/payments/${paymentId}/status`,
`${this.baseUrl}/api/v1/payments/${paymentId}/status`, );
); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<{
if (!response.ok) { status: string;
console.error( paidAt?: string;
`[PaymentClient] checkPaymentStatus failed: ${response.status}`, }>;
); return data.success ? data.data : null;
return null;
}
const data = (await response.json()) as ApiResponse<{
status: string;
paidAt?: string;
}>;
return data.success ? data.data : null;
} catch (error) {
console.error('[PaymentClient] checkPaymentStatus error:', error);
return null;
}
} }
/** /**
@ -175,75 +158,39 @@ export class PaymentClientService implements OnModuleInit {
paidAt?: string; paidAt?: string;
completedAt?: string; completedAt?: string;
} | null> { } | null> {
try { const resp = await this.protectedFetch(
const response = await fetch( `${this.baseUrl}/api/v1/orders/${orderId}/status`,
`${this.baseUrl}/api/v1/orders/${orderId}/status`, );
); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<{
if (!response.ok) { orderId: string;
console.error( status: string;
`[PaymentClient] getOrderStatus failed: ${response.status}`, paidAt?: string;
); completedAt?: string;
return null; }>;
} return data.success ? data.data : null;
const data = (await response.json()) as ApiResponse<{
orderId: string;
status: string;
paidAt?: string;
completedAt?: string;
}>;
return data.success ? data.data : null;
} catch (error) {
console.error('[PaymentClient] getOrderStatus error:', error);
return null;
}
} }
/** /**
* *
*/ */
async getUserOrders(userId: string): Promise<OrderInfo[]> { async getUserOrders(userId: string): Promise<OrderInfo[]> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/orders`, {
const response = await fetch(`${this.baseUrl}/api/v1/orders`, { headers: { 'x-user-id': userId },
headers: { 'x-user-id': userId }, });
}); if (!resp) return [];
const data = (await resp.json()) as ApiResponse<OrderInfo[]>;
if (!response.ok) { return data.success ? data.data : [];
console.error(
`[PaymentClient] getUserOrders failed: ${response.status}`,
);
return [];
}
const data = (await response.json()) as ApiResponse<OrderInfo[]>;
return data.success ? data.data : [];
} catch (error) {
console.error('[PaymentClient] getUserOrders error:', error);
return [];
}
} }
/** /**
* *
*/ */
async getOrderDetail(orderId: string): Promise<OrderInfo | null> { async getOrderDetail(orderId: string): Promise<OrderInfo | null> {
try { const resp = await this.protectedFetch(`${this.baseUrl}/api/v1/orders/${orderId}`);
const response = await fetch(`${this.baseUrl}/api/v1/orders/${orderId}`); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<OrderInfo>;
if (!response.ok) { return data.success ? data.data : null;
console.error(
`[PaymentClient] getOrderDetail failed: ${response.status}`,
);
return null;
}
const data = (await response.json()) as ApiResponse<OrderInfo>;
return data.success ? data.data : null;
} catch (error) {
console.error('[PaymentClient] getOrderDetail error:', error);
return null;
}
} }
/** /**
@ -252,28 +199,15 @@ export class PaymentClientService implements OnModuleInit {
async cancelOrder( async cancelOrder(
orderId: string, orderId: string,
): Promise<{ orderId: string; status: string } | null> { ): Promise<{ orderId: string; status: string } | null> {
try { const resp = await this.protectedFetch(
const response = await fetch( `${this.baseUrl}/api/v1/orders/${orderId}/cancel`,
`${this.baseUrl}/api/v1/orders/${orderId}/cancel`, { method: 'POST' },
{ method: 'POST' }, );
); if (!resp) return null;
const data = (await resp.json()) as ApiResponse<{
if (!response.ok) { orderId: string;
const errText = await response.text(); status: string;
console.error( }>;
`[PaymentClient] cancelOrder failed: ${response.status} ${errText}`, return data.success ? data.data : null;
);
return null;
}
const data = (await response.json()) as ApiResponse<{
orderId: string;
status: string;
}>;
return data.success ? data.data : null;
} catch (error) {
console.error('[PaymentClient] cancelOrder error:', error);
return null;
}
} }
} }