feat(planting): implement Outbox Pattern with consumer acknowledgment (B方案)

Implement reliable event delivery using Outbox Pattern with consumer confirmation:

## planting-service (Producer)
- Add OutboxEvent table with status: PENDING → SENT → CONFIRMED
- Add OutboxRepository with transaction support and timeout handling
- Add OutboxPublisherService with polling, timeout check, and retry
- Add EventAckController to receive consumer confirmations
- Update UnitOfWork to save outbox events atomically with business data
- Update PlantingApplicationService to use outbox pattern
- Update PoolInjectionService to use outbox pattern

## Consumer Services
- Add EventAckPublisher to reward-service, referral-service, authorization-service
- Update event handlers to send acknowledgment after successful processing

## Event Flow
1. Business data + outbox events saved in same transaction
2. OutboxPublisher polls and sends to Kafka, marks as SENT
3. Consumer processes event and sends ack to planting.events.ack
4. EventAckController receives ack and marks as CONFIRMED
5. Timeout check resets SENT→PENDING for retry (max 5 times)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-09 21:32:16 -08:00
parent b98844abe4
commit 014ad9d19f
24 changed files with 2204 additions and 33 deletions

View File

@ -0,0 +1,98 @@
import { Injectable, Logger, Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
/**
*
*/
interface EventAckMessage {
/** 原始事件的 aggregateId如 orderNo */
eventId: string;
/** 原始事件类型 */
eventType: string;
/** 消费服务名称 */
consumerService: string;
/** 处理结果 */
success: boolean;
/** 错误信息(如果失败) */
error?: string;
/** 确认时间 */
confirmedAt: string;
}
/**
*
*
* B方案核心组件
* planting.events.ack topic
*/
@Injectable()
export class EventAckPublisher {
private readonly logger = new Logger(EventAckPublisher.name);
private readonly serviceName = 'authorization-service';
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
) {}
/**
*
*/
async sendSuccess(eventId: string, eventType: string): Promise<void> {
const ackMessage: EventAckMessage = {
eventId,
eventType,
consumerService: this.serviceName,
success: true,
confirmedAt: new Date().toISOString(),
};
try {
this.kafkaClient.emit('planting.events.ack', {
key: eventId,
value: JSON.stringify(ackMessage),
});
this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`);
} catch (error) {
this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error);
}
}
/**
*
*/
async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise<void> {
const ackMessage: EventAckMessage = {
eventId,
eventType,
consumerService: this.serviceName,
success: false,
error: errorMessage,
confirmedAt: new Date().toISOString(),
};
try {
this.kafkaClient.emit('planting.events.ack', {
key: eventId,
value: JSON.stringify(ackMessage),
});
this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`);
} catch (error) {
this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error);
}
}
/**
* outbox
*/
extractOutboxInfo(message: Record<string, unknown>): { id: string; aggregateId: string; eventType: string } | null {
const outbox = message._outbox as { id: string; aggregateId: string; eventType: string } | undefined;
if (!outbox) {
this.logger.warn('[ACK] Message does not contain _outbox metadata');
return null;
}
return outbox;
}
}

View File

@ -1,28 +1,100 @@
import { Controller, Logger } from '@nestjs/common'
import { Controller, Logger, Inject } from '@nestjs/common'
import { EventPattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices'
import {
IAuthorizationRoleRepository,
AUTHORIZATION_ROLE_REPOSITORY,
IMonthlyAssessmentRepository,
MONTHLY_ASSESSMENT_REPOSITORY,
} from '../../domain/repositories'
import {
TEAM_STATISTICS_REPOSITORY,
} from '../../application/services/authorization-application.service'
import type { ITeamStatisticsRepository, TeamStatistics } from '../../domain/services'
import { UserId, Month } from '../../domain/value-objects'
import { RoleType, AuthorizationStatus } from '../../domain/enums'
import { LadderTargetRule } from '../../domain/entities'
import { MonthlyAssessment, AuthorizationRole } from '../../domain/aggregates'
import { EventPublisherService } from './event-publisher.service'
import { EventAckPublisher } from './event-ack.publisher'
interface TreePlantedPayload {
userId: string
treeCount: number
provinceCode?: string
cityCode?: string
}
interface PlantingEventMessage {
eventName?: string
eventType?: string
aggregateId?: string
occurredAt?: string
data?: TreePlantedPayload
payload?: TreePlantedPayload
// B方案outbox 元数据
_outbox?: {
id: string
aggregateId: string
eventType: string
}
}
@Controller()
export class EventConsumerController {
private readonly logger = new Logger(EventConsumerController.name)
constructor(
@Inject(AUTHORIZATION_ROLE_REPOSITORY)
private readonly authorizationRepository: IAuthorizationRoleRepository,
@Inject(MONTHLY_ASSESSMENT_REPOSITORY)
private readonly assessmentRepository: IMonthlyAssessmentRepository,
@Inject(TEAM_STATISTICS_REPOSITORY)
private readonly statsRepository: ITeamStatisticsRepository,
private readonly eventPublisher: EventPublisherService,
private readonly eventAckPublisher: EventAckPublisher,
) {}
// 监听认种事件 - 用于更新考核进度
@EventPattern('planting-events')
async handlePlantingEvent(
@Payload() message: any,
@Payload() message: PlantingEventMessage,
@Ctx() context: KafkaContext,
) {
try {
this.logger.log(`Received planting event: ${message.eventType}`)
const eventType = message.eventType || message.eventName || 'unknown'
const outboxInfo = message._outbox
const eventId = outboxInfo?.aggregateId || message.aggregateId || 'unknown'
switch (message.eventType) {
try {
this.logger.log(`[KAFKA] Received planting event: ${eventType}`)
this.logger.debug(`[KAFKA] Event payload: ${JSON.stringify(message)}`)
// 获取 payload支持多种格式
const payload = message.payload || message.data
switch (eventType) {
case 'planting.tree.planted':
await this.handleTreePlanted(message.payload)
case 'PlantingOrderPaid':
case 'MiningEnabled':
if (payload) {
await this.handleTreePlanted(payload)
}
break
default:
this.logger.debug(`Unhandled planting event type: ${message.eventType}`)
this.logger.debug(`[KAFKA] Unhandled planting event type: ${eventType}`)
}
// B方案发送处理成功确认
if (outboxInfo) {
await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType)
}
} catch (error) {
this.logger.error('Failed to handle planting event:', error)
this.logger.error('[KAFKA] Failed to handle planting event:', error)
// B方案发送处理失败确认
if (outboxInfo) {
const errorMessage = error instanceof Error ? error.message : String(error)
await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage)
}
}
}
@ -33,24 +105,180 @@ export class EventConsumerController {
@Ctx() context: KafkaContext,
) {
try {
this.logger.log(`Received referral event: ${message.eventType}`)
this.logger.log(`[KAFKA] Received referral event: ${message.eventType}`)
switch (message.eventType) {
case 'referral.relationship.created':
// 处理推荐关系创建事件
break
default:
this.logger.debug(`Unhandled referral event type: ${message.eventType}`)
this.logger.debug(`[KAFKA] Unhandled referral event type: ${message.eventType}`)
}
} catch (error) {
this.logger.error('Failed to handle referral event:', error)
this.logger.error('[KAFKA] Failed to handle referral event:', error)
}
}
private async handleTreePlanted(payload: any) {
// TODO: 更新用户团队认种统计
// TODO: 检查是否达成初始考核目标
// TODO: 更新月度考核进度
this.logger.log(`Tree planted by user: ${payload.userId}, count: ${payload.treeCount}`)
/**
*
* 1.
* 2.
*/
private async handleTreePlanted(payload: TreePlantedPayload): Promise<void> {
const { userId, treeCount } = payload
this.logger.log(`[PLANTING] Processing tree planted event: userId=${userId}, count=${treeCount}`)
try {
// 1. 获取用户团队统计
const teamStats = await this.statsRepository.findByUserId(userId)
if (!teamStats) {
this.logger.warn(`[PLANTING] No team stats found for user ${userId}, skipping`)
return
}
const totalTreeCount = teamStats.totalTeamPlantingCount
this.logger.debug(`[PLANTING] User ${userId} total team planting count: ${totalTreeCount}`)
// 2. 获取用户所有授权
const authorizations = await this.authorizationRepository.findByUserId(
UserId.create(userId),
)
if (authorizations.length === 0) {
this.logger.debug(`[PLANTING] User ${userId} has no authorizations`)
return
}
// 3. 处理每个授权
for (const auth of authorizations) {
this.logger.debug(`[PLANTING] Processing authorization: ${auth.authorizationId.value}, role=${auth.roleType}, benefitActive=${auth.benefitActive}`)
// 3.1 检查初始考核(权益未激活的情况)
if (!auth.benefitActive && auth.status === AuthorizationStatus.PENDING) {
const initialTarget = auth.getInitialTarget()
this.logger.debug(`[PLANTING] Checking initial target: ${totalTreeCount}/${initialTarget}`)
if (totalTreeCount >= initialTarget) {
this.logger.log(`[PLANTING] User ${userId} reached initial target for ${auth.roleType}, activating benefit`)
auth.activateBenefit()
await this.authorizationRepository.save(auth)
await this.eventPublisher.publishAll(auth.domainEvents)
auth.clearDomainEvents()
// 创建首月考核(针对授权省市公司)
if (auth.roleType === RoleType.AUTH_PROVINCE_COMPANY ||
auth.roleType === RoleType.AUTH_CITY_COMPANY) {
await this.createInitialAssessment(auth, teamStats)
}
}
}
// 3.2 更新月度考核进度(权益已激活的授权省市公司)
if (auth.benefitActive &&
(auth.roleType === RoleType.AUTH_PROVINCE_COMPANY ||
auth.roleType === RoleType.AUTH_CITY_COMPANY)) {
await this.updateMonthlyAssessment(auth, teamStats)
}
}
this.logger.log(`[PLANTING] Completed processing tree planted event for user ${userId}`)
} catch (error) {
this.logger.error(`[PLANTING] Error processing tree planted for user ${userId}:`, error)
throw error
}
}
/**
*
*/
private async createInitialAssessment(
authorization: AuthorizationRole,
teamStats: TeamStatistics,
): Promise<void> {
const currentMonth = Month.current()
const target = LadderTargetRule.getTarget(authorization.roleType, 1)
this.logger.log(`[ASSESSMENT] Creating initial assessment for ${authorization.authorizationId.value}, month=${currentMonth.value}`)
// 检查是否已存在
const existing = await this.assessmentRepository.findByAuthorizationAndMonth(
authorization.authorizationId,
currentMonth,
)
if (existing) {
this.logger.debug(`[ASSESSMENT] Assessment already exists, updating...`)
await this.doAssessmentUpdate(existing, authorization, teamStats)
return
}
const assessment = MonthlyAssessment.create({
authorizationId: authorization.authorizationId,
userId: authorization.userId,
roleType: authorization.roleType,
regionCode: authorization.regionCode,
assessmentMonth: currentMonth,
monthIndex: 1,
monthlyTarget: target.monthlyTarget,
cumulativeTarget: target.cumulativeTarget,
})
await this.doAssessmentUpdate(assessment, authorization, teamStats)
this.logger.log(`[ASSESSMENT] Created initial assessment for ${authorization.authorizationId.value}`)
}
/**
*
*/
private async updateMonthlyAssessment(
authorization: AuthorizationRole,
teamStats: TeamStatistics,
): Promise<void> {
const currentMonth = Month.current()
// 查找当月考核记录
const assessment = await this.assessmentRepository.findByAuthorizationAndMonth(
authorization.authorizationId,
currentMonth,
)
if (!assessment) {
this.logger.debug(`[ASSESSMENT] No assessment found for ${authorization.authorizationId.value} in ${currentMonth.value}`)
return
}
await this.doAssessmentUpdate(assessment, authorization, teamStats)
this.logger.log(`[ASSESSMENT] Updated assessment for ${authorization.authorizationId.value}`)
}
/**
*
*/
private async doAssessmentUpdate(
assessment: MonthlyAssessment,
authorization: AuthorizationRole,
teamStats: TeamStatistics,
): Promise<void> {
// 计算本地团队数量
let localTeamCount = 0
if (authorization.roleType === RoleType.AUTH_PROVINCE_COMPANY) {
localTeamCount = teamStats.getProvinceTeamCount(authorization.regionCode.value)
} else if (authorization.roleType === RoleType.AUTH_CITY_COMPANY) {
localTeamCount = teamStats.getCityTeamCount(authorization.regionCode.value)
}
this.logger.debug(`[ASSESSMENT] Assessing: cumulative=${teamStats.totalTeamPlantingCount}, local=${localTeamCount}, total=${teamStats.totalTeamPlantingCount}`)
assessment.assess({
cumulativeCompleted: teamStats.totalTeamPlantingCount,
localTeamCount,
totalTeamCount: teamStats.totalTeamPlantingCount,
requireLocalPercentage: authorization.requireLocalPercentage,
exemptFromPercentageCheck: authorization.exemptFromPercentageCheck,
})
await this.assessmentRepository.save(assessment)
await this.eventPublisher.publishAll(assessment.domainEvents)
assessment.clearDomainEvents()
}
}

View File

@ -12,3 +12,7 @@ JWT_SECRET="planting-service-dev-jwt-secret"
WALLET_SERVICE_URL=http://localhost:3002
IDENTITY_SERVICE_URL=http://localhost:3001
REFERRAL_SERVICE_URL=http://localhost:3004
# Kafka Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=planting-service

View File

@ -12,3 +12,7 @@ JWT_SECRET="your-super-secret-jwt-key-change-in-production"
WALLET_SERVICE_URL=http://localhost:3002
IDENTITY_SERVICE_URL=http://localhost:3001
REFERRAL_SERVICE_URL=http://localhost:3004
# Kafka Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=planting-service

View File

@ -14,6 +14,7 @@
"@nestjs/config": "^3.1.1",
"@nestjs/core": "^10.0.0",
"@nestjs/jwt": "^10.2.0",
"@nestjs/microservices": "^10.0.0",
"@nestjs/passport": "^10.0.3",
"@nestjs/platform-express": "^10.0.0",
"@nestjs/swagger": "^7.1.17",
@ -21,6 +22,7 @@
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"jsonwebtoken": "^9.0.0",
"kafkajs": "^2.2.4",
"passport": "^0.7.0",
"passport-jwt": "^4.0.1",
"reflect-metadata": "^0.1.13",
@ -246,6 +248,7 @@
"integrity": "sha512-e7jT4DxYvIDLk1ZHmU/m/mB19rex9sv0c2ftBtjSBv+kVM/902eh0fINUzD7UwLLNR+jU585GxUJ8/EBfAM5fw==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@babel/code-frame": "^7.27.1",
"@babel/generator": "^7.28.5",
@ -1865,6 +1868,65 @@
}
}
},
"node_modules/@nestjs/microservices": {
"version": "10.4.20",
"resolved": "https://registry.npmjs.org/@nestjs/microservices/-/microservices-10.4.20.tgz",
"integrity": "sha512-zu/o84Z0uTUClNnGIGfIjcrO3z6T60h/pZPSJK50o4mehbEvJ76fijj6R/WTW0VP+1N16qOv/NsiYLKJA5Cc3w==",
"license": "MIT",
"peer": true,
"dependencies": {
"iterare": "1.2.1",
"tslib": "2.8.1"
},
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/nest"
},
"peerDependencies": {
"@grpc/grpc-js": "*",
"@nestjs/common": "^10.0.0",
"@nestjs/core": "^10.0.0",
"@nestjs/websockets": "^10.0.0",
"amqp-connection-manager": "*",
"amqplib": "*",
"cache-manager": "*",
"ioredis": "*",
"kafkajs": "*",
"mqtt": "*",
"nats": "*",
"reflect-metadata": "^0.1.12 || ^0.2.0",
"rxjs": "^7.1.0"
},
"peerDependenciesMeta": {
"@grpc/grpc-js": {
"optional": true
},
"@nestjs/websockets": {
"optional": true
},
"amqp-connection-manager": {
"optional": true
},
"amqplib": {
"optional": true
},
"cache-manager": {
"optional": true
},
"ioredis": {
"optional": true
},
"kafkajs": {
"optional": true
},
"mqtt": {
"optional": true
},
"nats": {
"optional": true
}
}
},
"node_modules/@nestjs/passport": {
"version": "10.0.3",
"resolved": "https://registry.npmjs.org/@nestjs/passport/-/passport-10.0.3.tgz",
@ -6964,6 +7026,16 @@
"safe-buffer": "^5.0.1"
}
},
"node_modules/kafkajs": {
"version": "2.2.4",
"resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz",
"integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==",
"license": "MIT",
"peer": true,
"engines": {
"node": ">=14.0.0"
}
},
"node_modules/keyv": {
"version": "4.5.4",
"resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz",

View File

@ -27,12 +27,14 @@
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^3.1.1",
"@nestjs/core": "^10.0.0",
"@nestjs/microservices": "^10.0.0",
"@nestjs/platform-express": "^10.0.0",
"@nestjs/swagger": "^7.1.17",
"@nestjs/axios": "^3.0.1",
"@nestjs/jwt": "^10.2.0",
"@nestjs/passport": "^10.0.3",
"@prisma/client": "^5.7.0",
"kafkajs": "^2.2.4",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"jsonwebtoken": "^9.0.0",

View File

@ -0,0 +1,31 @@
-- CreateTable: outbox_events (Outbox Pattern)
CREATE TABLE "outbox_events" (
"outbox_id" BIGSERIAL NOT NULL,
"event_type" VARCHAR(100) NOT NULL,
"topic" VARCHAR(100) NOT NULL,
"key" VARCHAR(200) NOT NULL,
"payload" JSONB NOT NULL,
"aggregate_id" VARCHAR(100) NOT NULL,
"aggregate_type" VARCHAR(50) NOT NULL,
"status" VARCHAR(20) NOT NULL DEFAULT 'PENDING',
"retry_count" INTEGER NOT NULL DEFAULT 0,
"max_retries" INTEGER NOT NULL DEFAULT 5,
"last_error" TEXT,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"published_at" TIMESTAMP(3),
"next_retry_at" TIMESTAMP(3),
CONSTRAINT "outbox_events_pkey" PRIMARY KEY ("outbox_id")
);
-- CreateIndex
CREATE INDEX "outbox_events_status_created_at_idx" ON "outbox_events"("status", "created_at");
-- CreateIndex
CREATE INDEX "outbox_events_status_next_retry_at_idx" ON "outbox_events"("status", "next_retry_at");
-- CreateIndex
CREATE INDEX "outbox_events_aggregate_type_aggregate_id_idx" ON "outbox_events"("aggregate_type", "aggregate_id");
-- CreateIndex
CREATE INDEX "outbox_events_topic_idx" ON "outbox_events"("topic");

View File

@ -195,3 +195,41 @@ model PlantingEvent {
@@index([occurredAt])
@@map("planting_events")
}
// ============================================
// Outbox 事件发件箱表 (Outbox Pattern)
// 保证事件发布的可靠性:
// 1. 业务数据和 Outbox 记录在同一个事务中写入
// 2. 后台任务轮询 Outbox 表并发布到 Kafka
// 3. 发布成功后标记为已处理
// ============================================
model OutboxEvent {
id BigInt @id @default(autoincrement()) @map("outbox_id")
// 事件信息
eventType String @map("event_type") @db.VarChar(100)
topic String @map("topic") @db.VarChar(100)
key String @map("key") @db.VarChar(200)
payload Json @map("payload")
// 聚合根信息 (用于幂等性检查)
aggregateId String @map("aggregate_id") @db.VarChar(100)
aggregateType String @map("aggregate_type") @db.VarChar(50)
// 发布状态
status String @default("PENDING") @map("status") @db.VarChar(20) // PENDING, PUBLISHED, FAILED
retryCount Int @default(0) @map("retry_count")
maxRetries Int @default(5) @map("max_retries")
lastError String? @map("last_error") @db.Text
// 时间戳
createdAt DateTime @default(now()) @map("created_at")
publishedAt DateTime? @map("published_at")
nextRetryAt DateTime? @map("next_retry_at")
@@index([status, createdAt])
@@index([status, nextRetryAt])
@@index([aggregateType, aggregateId])
@@index([topic])
@@map("outbox_events")
}

View File

@ -16,6 +16,7 @@ import { FundAllocationDomainService } from '../../domain/services/fund-allocati
import { WalletServiceClient } from '../../infrastructure/external/wallet-service.client';
import { ReferralServiceClient } from '../../infrastructure/external/referral-service.client';
import { UnitOfWork, UNIT_OF_WORK } from '../../infrastructure/persistence/unit-of-work';
import { OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository';
import { PRICE_PER_TREE } from '../../domain/value-objects/fund-allocation-target-type.enum';
// 个人最大认种数量限制
@ -228,8 +229,8 @@ export class PlantingApplicationService {
order.markAsPaid();
order.allocateFunds(allocations);
// 7. 使用事务保存本地数据库的所有变更
// 这确保了订单状态、用户持仓、批次数据的原子性
// 7. 使用事务保存本地数据库的所有变更 + Outbox事件
// 这确保了订单状态、用户持仓、批次数据、以及事件发布的原子性
await this.unitOfWork.executeInTransaction(async (uow) => {
// 保存订单状态
await uow.saveOrder(order);
@ -256,6 +257,16 @@ export class PlantingApplicationService {
scheduledTime.setHours(scheduledTime.getHours() + 1);
order.schedulePoolInjection(batch.id!, scheduledTime);
await uow.saveOrder(order);
// 8. 添加 Outbox 事件(在同一事务中保存)
// 使用 Outbox Pattern 保证事件发布的可靠性
const outboxEvents = this.buildOutboxEvents(order, selection);
uow.addOutboxEvents(outboxEvents);
// 提交 Outbox 事件(在事务中保存到数据库)
await uow.commitOutboxEvents();
this.logger.log(`[OUTBOX] Added ${outboxEvents.length} events to outbox for order ${order.orderNo}`);
});
this.logger.log(`Local database transaction committed for order ${order.orderNo}`);
@ -268,6 +279,9 @@ export class PlantingApplicationService {
this.logger.log(`Order paid successfully: ${order.orderNo}`);
// 清除已保存到 Outbox 的领域事件
order.clearDomainEvents();
return {
orderNo: order.orderNo,
status: order.status,
@ -400,4 +414,80 @@ export class PlantingApplicationService {
}
}
/**
* Outbox
*
* :
* - reward-service: 分配奖励 (planting.order.paid)
* - referral-service: 更新团队统计 (planting.planting.created)
* - authorization-service: 更新KPI考核 (planting-events)
*/
private buildOutboxEvents(
order: PlantingOrder,
selection: { provinceCode: string; cityCode: string },
): OutboxEventData[] {
const events: OutboxEventData[] = [];
this.logger.debug(`[OUTBOX] Building outbox events for order ${order.orderNo}`);
// 1. planting.order.paid 事件 (reward-service 消费)
events.push({
eventType: 'planting.order.paid',
topic: 'planting.order.paid',
key: order.userId.toString(),
payload: {
eventName: 'planting.order.paid',
data: {
orderId: order.orderNo,
userId: order.userId.toString(),
treeCount: order.treeCount.value,
provinceCode: selection.provinceCode,
cityCode: selection.cityCode,
paidAt: order.paidAt!.toISOString(),
},
},
aggregateId: order.orderNo,
aggregateType: 'PlantingOrder',
});
// 2. planting.planting.created 事件 (referral-service 消费)
events.push({
eventType: 'planting.planting.created',
topic: 'planting.planting.created',
key: order.userId.toString(),
payload: {
eventName: 'planting.created',
data: {
userId: order.userId.toString(),
treeCount: order.treeCount.value,
provinceCode: selection.provinceCode,
cityCode: selection.cityCode,
},
},
aggregateId: order.orderNo,
aggregateType: 'PlantingOrder',
});
// 3. planting-events 事件 (authorization-service 消费)
// 发布所有领域事件
for (const domainEvent of order.domainEvents) {
events.push({
eventType: domainEvent.type,
topic: 'planting-events',
key: order.userId.toString(),
payload: {
eventName: domainEvent.type,
aggregateId: domainEvent.aggregateId,
occurredAt: domainEvent.occurredAt,
data: domainEvent.data,
},
aggregateId: order.orderNo,
aggregateType: 'PlantingOrder',
});
}
this.logger.debug(`[OUTBOX] Built ${events.length} outbox events for order ${order.orderNo}`);
return events;
}
}

View File

@ -12,7 +12,9 @@ import {
POOL_INJECTION_BATCH_REPOSITORY,
} from '../../domain/repositories/pool-injection-batch.repository.interface';
import { WalletServiceClient } from '../../infrastructure/external/wallet-service.client';
import { OutboxRepository, OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository';
import { BatchStatus } from '../../domain/value-objects/batch-status.enum';
import { PlantingOrder } from '../../domain/aggregates/planting-order.aggregate';
@Injectable()
export class PoolInjectionService {
@ -26,6 +28,7 @@ export class PoolInjectionService {
@Inject(POOL_INJECTION_BATCH_REPOSITORY)
private readonly batchRepository: IPoolInjectionBatchRepository,
private readonly walletService: WalletServiceClient,
private readonly outboxRepository: OutboxRepository,
) {}
/**
@ -80,7 +83,11 @@ export class PoolInjectionService {
// 更新批次内所有订单状态
const orders = await this.orderRepository.findByBatchId(batchId);
this.logger.log(`[BATCH] Processing ${orders.length} orders in batch ${batch.batchNo}`);
for (const order of orders) {
this.logger.debug(`[BATCH] Processing order ${order.orderNo}`);
order.confirmPoolInjection(result.txHash);
await this.orderRepository.save(order);
@ -96,10 +103,13 @@ export class PoolInjectionService {
position.activateMining(order.treeCount.value);
await this.positionRepository.save(position);
}
// 保存 MiningEnabled 事件到 Outbox使用 Outbox Pattern
await this.saveMiningEnabledEventsToOutbox(order);
}
this.logger.log(
`Batch ${batch.batchNo} processed successfully, txHash: ${result.txHash}`,
`[BATCH] ✓ Batch ${batch.batchNo} processed successfully, txHash: ${result.txHash}, orders: ${orders.length}`,
);
} catch (error) {
this.logger.error(`Failed to inject batch ${batch.batchNo}`, error);
@ -160,4 +170,44 @@ export class PoolInjectionService {
injectionTxHash: batch.injectionTxHash,
};
}
/**
* Outbox
*
* 使 Outbox Pattern
* OutboxPublisherService Kafka
*/
private async saveMiningEnabledEventsToOutbox(order: PlantingOrder): Promise<void> {
this.logger.debug(`[OUTBOX] Saving MiningEnabled events to outbox for order ${order.orderNo}`);
const outboxEvents: OutboxEventData[] = [];
// 将领域事件转换为 Outbox 事件
for (const domainEvent of order.domainEvents) {
outboxEvents.push({
eventType: domainEvent.type,
topic: 'planting-events',
key: order.userId.toString(),
payload: {
eventName: domainEvent.type,
aggregateId: domainEvent.aggregateId,
occurredAt: domainEvent.occurredAt,
data: domainEvent.data,
},
aggregateId: order.orderNo,
aggregateType: 'PlantingOrder',
});
}
if (outboxEvents.length > 0) {
// 使用 PrismaService 事务保存(这里简化处理,直接保存)
// 注意:理想情况下应该在 processBatch 的事务中一起保存
await this.outboxRepository.saveEvents(outboxEvents);
this.logger.log(`[OUTBOX] ✓ Saved ${outboxEvents.length} MiningEnabled events to outbox for order ${order.orderNo}`);
}
// 清除领域事件
order.clearDomainEvents();
}
}

View File

@ -4,9 +4,13 @@ import { PrismaService } from './persistence/prisma/prisma.service';
import { PlantingOrderRepositoryImpl } from './persistence/repositories/planting-order.repository.impl';
import { PlantingPositionRepositoryImpl } from './persistence/repositories/planting-position.repository.impl';
import { PoolInjectionBatchRepositoryImpl } from './persistence/repositories/pool-injection-batch.repository.impl';
import { OutboxRepository } from './persistence/repositories/outbox.repository';
import { UnitOfWork, UNIT_OF_WORK } from './persistence/unit-of-work';
import { WalletServiceClient } from './external/wallet-service.client';
import { ReferralServiceClient } from './external/referral-service.client';
import { KafkaModule } from './kafka/kafka.module';
import { OutboxPublisherService } from './kafka/outbox-publisher.service';
import { EventAckController } from './kafka/event-ack.controller';
import { PLANTING_ORDER_REPOSITORY } from '../domain/repositories/planting-order.repository.interface';
import { PLANTING_POSITION_REPOSITORY } from '../domain/repositories/planting-position.repository.interface';
import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-injection-batch.repository.interface';
@ -18,7 +22,9 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj
timeout: 5000,
maxRedirects: 5,
}),
KafkaModule,
],
controllers: [EventAckController],
providers: [
PrismaService,
{
@ -37,6 +43,8 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj
provide: UNIT_OF_WORK,
useClass: UnitOfWork,
},
OutboxRepository,
OutboxPublisherService,
WalletServiceClient,
ReferralServiceClient,
],
@ -46,6 +54,8 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj
PLANTING_POSITION_REPOSITORY,
POOL_INJECTION_BATCH_REPOSITORY,
UNIT_OF_WORK,
OutboxRepository,
OutboxPublisherService,
WalletServiceClient,
ReferralServiceClient,
],

View File

@ -0,0 +1,99 @@
import { Injectable, Logger, OnModuleInit, Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { OutboxRepository } from '../persistence/repositories/outbox.repository';
/**
*
*/
interface EventAckMessage {
/** 原始事件的 aggregateId如 orderNo */
eventId: string;
/** 原始事件类型 */
eventType: string;
/** 消费服务名称 */
consumerService: string;
/** 处理结果 */
success: boolean;
/** 错误信息(如果失败) */
error?: string;
/** 确认时间 */
confirmedAt: string;
}
/**
*
*
* B方案核心组件
* reward-service, referral-service, authorization-service
* planting.events.ack topic
*
* :
* 1. planting-service Kafka SENT
* 2. planting.events.ack
* 3. CONFIRMED
* 4.
*/
@Injectable()
export class EventAckConsumer implements OnModuleInit {
private readonly logger = new Logger(EventAckConsumer.name);
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
private readonly outboxRepository: OutboxRepository,
) {}
async onModuleInit() {
// 订阅确认 topic
this.kafkaClient.subscribeToResponseOf('planting.events.ack');
this.logger.log('[ACK-CONSUMER] Subscribed to planting.events.ack topic');
// 连接后开始消费
try {
await this.kafkaClient.connect();
this.startConsuming();
} catch (error) {
this.logger.error('[ACK-CONSUMER] Failed to connect to Kafka:', error);
}
}
private startConsuming() {
// 使用 ClientKafka 的 emit 来订阅消息
// 注意NestJS 的 ClientKafka 主要用于生产者,消费者通常使用 @MessagePattern
// 这里我们使用轮询方式或者通过 Controller 来处理
this.logger.log('[ACK-CONSUMER] Event acknowledgment consumer started');
}
/**
*
* Kafka Controller
*/
async handleAckMessage(message: EventAckMessage): Promise<void> {
this.logger.debug(`[ACK-CONSUMER] Received ack: ${JSON.stringify(message)}`);
try {
if (message.success) {
// 标记事件为已确认
const confirmed = await this.outboxRepository.markAsConfirmed(message.eventId);
if (confirmed) {
this.logger.log(
`[ACK-CONSUMER] ✓ Event ${message.eventId} confirmed by ${message.consumerService}`,
);
}
} else {
// 消费方处理失败,记录错误但不改变状态
// 超时后会自动重发
this.logger.warn(
`[ACK-CONSUMER] ✗ Event ${message.eventId} failed in ${message.consumerService}: ${message.error}`,
);
}
} catch (error) {
this.logger.error(
`[ACK-CONSUMER] Error processing ack for event ${message.eventId}:`,
error,
);
}
}
}

View File

@ -0,0 +1,82 @@
import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices';
import { OutboxRepository } from '../persistence/repositories/outbox.repository';
/**
*
*/
interface EventAckMessage {
/** 原始事件的 aggregateId如 orderNo */
eventId: string;
/** 原始事件类型 */
eventType: string;
/** 消费服务名称 */
consumerService: string;
/** 处理结果 */
success: boolean;
/** 错误信息(如果失败) */
error?: string;
/** 确认时间 */
confirmedAt: string;
}
/**
* Kafka
*
* B方案核心组件
* 使 @MessagePattern Kafka
*/
@Controller()
export class EventAckController {
private readonly logger = new Logger(EventAckController.name);
constructor(private readonly outboxRepository: OutboxRepository) {}
/**
*
*
* reward-service, referral-service, authorization-service
* topic
*/
@MessagePattern('planting.events.ack')
async handleEventAck(
@Payload() message: EventAckMessage,
@Ctx() context: KafkaContext,
): Promise<void> {
const partition = context.getPartition();
const offset = context.getMessage().offset;
this.logger.debug(
`[ACK] Received ack from ${message.consumerService} for event ${message.eventId} ` +
`[partition=${partition}, offset=${offset}]`,
);
try {
if (message.success) {
// 标记事件为已确认
const confirmed = await this.outboxRepository.markAsConfirmed(message.eventId);
if (confirmed) {
this.logger.log(
`[ACK] ✓ Event ${message.eventId} (${message.eventType}) confirmed by ${message.consumerService}`,
);
} else {
this.logger.warn(
`[ACK] Event ${message.eventId} not found or already confirmed`,
);
}
} else {
// 消费方处理失败
this.logger.warn(
`[ACK] ✗ Event ${message.eventId} failed in ${message.consumerService}: ${message.error}`,
);
// 不改变状态,等待超时重发
}
} catch (error) {
this.logger.error(
`[ACK] Error processing ack for event ${message.eventId}:`,
error,
);
}
}
}

View File

@ -0,0 +1,245 @@
import { Injectable, Inject, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { ConfigService } from '@nestjs/config';
import { DomainEvent } from '../../domain/events/domain-event.interface';
/**
* Kafka Topic
*
* :
* - reward-service: 监听 planting.order.paid
* - authorization-service: 监听 planting-events
* - referral-service: 监听 planting.planting.created
*/
const EVENT_TOPIC_MAP: Record<string, string> = {
PlantingOrderCreated: 'planting.order.created',
ProvinceCityConfirmed: 'planting.order.province-city-confirmed',
PlantingOrderPaid: 'planting.order.paid',
FundsAllocated: 'planting.order.funds-allocated',
PoolInjected: 'planting.pool.injected',
MiningEnabled: 'planting.mining.enabled',
};
@Injectable()
export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(EventPublisherService.name);
private isConnected = false;
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
private readonly configService: ConfigService,
) {
const brokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092');
this.logger.log(`[INIT] EventPublisherService created, brokers: ${brokers}`);
}
async onModuleInit() {
this.logger.log('[KAFKA] Attempting to connect to Kafka...');
try {
await this.kafkaClient.connect();
this.isConnected = true;
this.logger.log('[KAFKA] Successfully connected to Kafka broker');
} catch (error) {
this.logger.error('[KAFKA] Failed to connect to Kafka:', error);
// 不抛出错误,允许服务启动(开发环境可能没有 Kafka
this.logger.warn('[KAFKA] Service will start without Kafka connection');
}
}
async onModuleDestroy() {
if (this.isConnected) {
this.logger.log('[KAFKA] Disconnecting from Kafka...');
await this.kafkaClient.close();
this.logger.log('[KAFKA] Disconnected from Kafka');
}
}
/**
*
*/
async publish(event: DomainEvent): Promise<void> {
const topic = this.getTopicForEvent(event);
const message = this.formatMessage(event);
this.logger.debug(`[PUBLISH] Preparing to publish event:
- Type: ${event.type}
- Topic: ${topic}
- AggregateId: ${event.aggregateId}
- Data: ${JSON.stringify(event.data)}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping event ${event.type}`);
return;
}
try {
this.kafkaClient.emit(topic, message);
this.logger.log(`[PUBLISH] ✓ Event ${event.type} published to topic ${topic}`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish event ${event.type}:`, error);
throw error;
}
}
/**
*
*/
async publishAll(events: readonly DomainEvent[]): Promise<void> {
this.logger.log(`[PUBLISH] Publishing ${events.length} events...`);
for (const event of events) {
await this.publish(event);
}
this.logger.log(`[PUBLISH] Finished publishing ${events.length} events`);
}
/**
* planting-events topic ( authorization-service )
*/
async publishToPlantingEvents(event: DomainEvent): Promise<void> {
const eventType = this.mapEventTypeToPlantingEventType(event.type);
const message = {
key: event.aggregateId,
value: JSON.stringify({
eventType,
payload: event.data,
occurredAt: event.occurredAt.toISOString(),
}),
};
this.logger.debug(`[PUBLISH] Preparing planting-events message:
- Original Type: ${event.type}
- Mapped Type: ${eventType}
- Payload: ${JSON.stringify(event.data)}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting-events for ${event.type}`);
return;
}
try {
this.kafkaClient.emit('planting-events', message);
this.logger.log(`[PUBLISH] ✓ Event ${event.type} published to topic planting-events`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish to planting-events:`, error);
}
}
/**
* (reward-service )
*/
async publishPlantingOrderPaid(params: {
orderId: string;
userId: string;
treeCount: number;
provinceCode: string;
cityCode: string;
paidAt: Date;
}): Promise<void> {
const topic = 'planting.order.paid';
const message = {
key: params.orderId,
value: JSON.stringify({
orderId: params.orderId,
userId: params.userId,
treeCount: params.treeCount,
provinceCode: params.provinceCode,
cityCode: params.cityCode,
paidAt: params.paidAt.toISOString(),
}),
};
this.logger.debug(`[PUBLISH] Publishing PlantingOrderPaid for reward-service:
- OrderId: ${params.orderId}
- UserId: ${params.userId}
- TreeCount: ${params.treeCount}
- Province: ${params.provinceCode}
- City: ${params.cityCode}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting.order.paid`);
return;
}
try {
this.kafkaClient.emit(topic, message);
this.logger.log(`[PUBLISH] ✓ PlantingOrderPaid published for order ${params.orderId}`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish PlantingOrderPaid:`, error);
}
}
/**
* (referral-service )
*/
async publishPlantingCreated(params: {
userId: string;
treeCount: number;
provinceCode: string;
cityCode: string;
}): Promise<void> {
const topic = 'planting.planting.created';
const message = {
key: params.userId,
value: JSON.stringify({
eventName: 'planting.created',
data: {
userId: params.userId,
treeCount: params.treeCount,
provinceCode: params.provinceCode,
cityCode: params.cityCode,
},
}),
};
this.logger.debug(`[PUBLISH] Publishing PlantingCreated for referral-service:
- UserId: ${params.userId}
- TreeCount: ${params.treeCount}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting.planting.created`);
return;
}
try {
this.kafkaClient.emit(topic, message);
this.logger.log(`[PUBLISH] ✓ PlantingCreated published for user ${params.userId}`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish PlantingCreated:`, error);
}
}
private getTopicForEvent(event: DomainEvent): string {
const topic = EVENT_TOPIC_MAP[event.type] || 'planting.events';
this.logger.debug(`[TOPIC] Mapped event type ${event.type} to topic ${topic}`);
return topic;
}
private formatMessage(event: DomainEvent) {
return {
key: event.aggregateId,
value: JSON.stringify({
eventId: `${event.aggregateId}-${event.occurredAt.getTime()}`,
eventType: event.type,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
occurredAt: event.occurredAt.toISOString(),
data: event.data,
}),
};
}
/**
* authorization-service
*/
private mapEventTypeToPlantingEventType(eventType: string): string {
switch (eventType) {
case 'PlantingOrderCreated':
return 'planting.tree.planted';
case 'PlantingOrderPaid':
return 'planting.tree.planted';
default:
return `planting.${eventType.toLowerCase()}`;
}
}
}

View File

@ -0,0 +1,2 @@
export * from './kafka.module';
export * from './event-publisher.service';

View File

@ -0,0 +1,32 @@
import { Module, Global } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { EventPublisherService } from './event-publisher.service';
@Global()
@Module({
imports: [
ClientsModule.registerAsync([
{
name: 'KAFKA_SERVICE',
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
transport: Transport.KAFKA,
options: {
client: {
clientId: configService.get<string>('KAFKA_CLIENT_ID', 'planting-service'),
brokers: configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(','),
},
producer: {
allowAutoTopicCreation: true,
},
},
}),
inject: [ConfigService],
},
]),
],
providers: [EventPublisherService],
exports: [EventPublisherService, ClientsModule],
})
export class KafkaModule {}

View File

@ -0,0 +1,280 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy, Inject } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ClientKafka } from '@nestjs/microservices';
import { OutboxRepository, OutboxEvent, OutboxStatus } from '../persistence/repositories/outbox.repository';
/**
* Outbox Publisher Service (B方案 - )
*
* Outbox Kafka
* 使100%
*
* :
* 1. PENDING
* 2. Kafka SENT
* 3. planting.events.ack
* 4. CONFIRMED
* 5. PENDING
*/
@Injectable()
export class OutboxPublisherService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(OutboxPublisherService.name);
private isRunning = false;
private pollInterval: NodeJS.Timeout | null = null;
private timeoutCheckInterval: NodeJS.Timeout | null = null;
private cleanupInterval: NodeJS.Timeout | null = null;
private isConnected = false;
// 配置
private readonly pollIntervalMs: number;
private readonly batchSize: number;
private readonly cleanupIntervalMs: number;
private readonly confirmationTimeoutMinutes: number;
private readonly timeoutCheckIntervalMs: number;
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
private readonly outboxRepository: OutboxRepository,
private readonly configService: ConfigService,
) {
this.pollIntervalMs = this.configService.get<number>('OUTBOX_POLL_INTERVAL_MS', 1000);
this.batchSize = this.configService.get<number>('OUTBOX_BATCH_SIZE', 100);
this.cleanupIntervalMs = this.configService.get<number>('OUTBOX_CLEANUP_INTERVAL_MS', 3600000); // 1小时
this.confirmationTimeoutMinutes = this.configService.get<number>('OUTBOX_CONFIRMATION_TIMEOUT_MINUTES', 5);
this.timeoutCheckIntervalMs = this.configService.get<number>('OUTBOX_TIMEOUT_CHECK_INTERVAL_MS', 60000); // 1分钟
this.logger.log(
`[OUTBOX] OutboxPublisher (B方案) configured: ` +
`pollInterval=${this.pollIntervalMs}ms, batchSize=${this.batchSize}, ` +
`confirmationTimeout=${this.confirmationTimeoutMinutes}min`,
);
}
async onModuleInit() {
this.logger.log('[OUTBOX] Connecting to Kafka...');
try {
await this.kafkaClient.connect();
this.isConnected = true;
this.logger.log('[OUTBOX] Connected to Kafka');
this.start();
} catch (error) {
this.logger.error('[OUTBOX] Failed to connect to Kafka:', error);
this.logger.warn('[OUTBOX] OutboxPublisher will not start - events will accumulate in outbox table');
}
}
async onModuleDestroy() {
this.stop();
if (this.isConnected) {
await this.kafkaClient.close();
}
}
/**
*
*/
start(): void {
if (this.isRunning) {
this.logger.warn('[OUTBOX] Publisher already running');
return;
}
this.isRunning = true;
this.logger.log('[OUTBOX] Starting outbox publisher (B方案)...');
// 启动发布轮询
this.pollInterval = setInterval(() => {
this.processOutbox().catch((err) => {
this.logger.error('[OUTBOX] Error processing outbox:', err);
});
}, this.pollIntervalMs);
// 启动超时检查任务B方案核心
this.timeoutCheckInterval = setInterval(() => {
this.checkConfirmationTimeouts().catch((err) => {
this.logger.error('[OUTBOX] Error checking confirmation timeouts:', err);
});
}, this.timeoutCheckIntervalMs);
// 启动清理任务
this.cleanupInterval = setInterval(() => {
this.cleanup().catch((err) => {
this.logger.error('[OUTBOX] Error cleaning up outbox:', err);
});
}, this.cleanupIntervalMs);
this.logger.log('[OUTBOX] Outbox publisher started (B方案 - 消费方确认模式)');
}
/**
*
*/
stop(): void {
if (!this.isRunning) return;
this.isRunning = false;
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
if (this.timeoutCheckInterval) {
clearInterval(this.timeoutCheckInterval);
this.timeoutCheckInterval = null;
}
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
this.logger.log('[OUTBOX] Outbox publisher stopped');
}
/**
* Outbox
*/
async processOutbox(): Promise<void> {
if (!this.isConnected) {
return;
}
try {
// 1. 获取待发布事件
const pendingEvents = await this.outboxRepository.findPendingEvents(this.batchSize);
// 2. 获取需要重试的事件
const retryEvents = await this.outboxRepository.findEventsForRetry(this.batchSize / 2);
const allEvents = [...pendingEvents, ...retryEvents];
if (allEvents.length === 0) {
return;
}
this.logger.debug(`[OUTBOX] Processing ${allEvents.length} events (${pendingEvents.length} pending, ${retryEvents.length} retry)`);
// 3. 逐个发布
for (const event of allEvents) {
await this.publishEvent(event);
}
} catch (error) {
this.logger.error('[OUTBOX] Error in processOutbox:', error);
}
}
/**
* (B方案)
*
* 使 emit() Kafka SENT
* CONFIRMED
*/
private async publishEvent(event: OutboxEvent): Promise<void> {
try {
this.logger.debug(`[OUTBOX] Publishing event ${event.id} to topic ${event.topic}`);
// 构造 Kafka 消息,包含 outboxId 用于确认
const payload = {
...(event.payload as Record<string, unknown>),
_outbox: {
id: event.id.toString(),
aggregateId: event.aggregateId,
eventType: event.eventType,
},
};
const message = {
key: event.key,
value: JSON.stringify(payload),
};
// 发布到 Kafka
this.kafkaClient.emit(event.topic, message);
// B方案标记为 SENT等待消费方确认
await this.outboxRepository.markAsSent(event.id);
this.logger.log(
`[OUTBOX] → Event ${event.id} sent to ${event.topic} (awaiting consumer confirmation)`,
);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`[OUTBOX] ✗ Failed to publish event ${event.id}: ${errorMessage}`);
// 标记为失败并安排重试
await this.outboxRepository.markAsFailed(event.id, errorMessage);
}
}
/**
* (B方案核心)
*
* SENT PENDING 便
*/
private async checkConfirmationTimeouts(): Promise<void> {
if (!this.isConnected) {
return;
}
try {
const timedOutEvents = await this.outboxRepository.findSentEventsTimedOut(
this.confirmationTimeoutMinutes,
this.batchSize,
);
if (timedOutEvents.length === 0) {
return;
}
this.logger.warn(
`[OUTBOX] Found ${timedOutEvents.length} events without confirmation after ${this.confirmationTimeoutMinutes} minutes`,
);
for (const event of timedOutEvents) {
await this.outboxRepository.resetSentToPending(event.id);
this.logger.warn(
`[OUTBOX] Event ${event.id} reset to PENDING for retry (retry #${event.retryCount + 1})`,
);
}
} catch (error) {
this.logger.error('[OUTBOX] Error checking confirmation timeouts:', error);
}
}
/**
*
*/
private async cleanup(): Promise<void> {
const retentionDays = this.configService.get<number>('OUTBOX_RETENTION_DAYS', 7);
await this.outboxRepository.cleanupOldEvents(retentionDays);
}
/**
*
*/
async triggerProcessing(): Promise<void> {
this.logger.log('[OUTBOX] Manual processing triggered');
await this.processOutbox();
}
/**
*
*/
async getStats(): Promise<{
isRunning: boolean;
isConnected: boolean;
pending: number;
sent: number;
confirmed: number;
failed: number;
}> {
const stats = await this.outboxRepository.getStats();
return {
isRunning: this.isRunning,
isConnected: this.isConnected,
...stats,
};
}
}

View File

@ -0,0 +1,364 @@
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { Prisma } from '@prisma/client';
export enum OutboxStatus {
PENDING = 'PENDING', // 待发送
SENT = 'SENT', // 已发送到 Kafka等待消费方确认
CONFIRMED = 'CONFIRMED', // 消费方已确认处理成功
FAILED = 'FAILED', // 发送失败,等待重试
}
export interface OutboxEventData {
eventType: string;
topic: string;
key: string;
payload: Record<string, unknown>;
aggregateId: string;
aggregateType: string;
}
export interface OutboxEvent extends OutboxEventData {
id: bigint;
status: OutboxStatus;
retryCount: number;
maxRetries: number;
lastError: string | null;
createdAt: Date;
publishedAt: Date | null;
nextRetryAt: Date | null;
}
@Injectable()
export class OutboxRepository {
private readonly logger = new Logger(OutboxRepository.name);
constructor(private readonly prisma: PrismaService) {}
/**
* Outbox
*/
async saveInTransaction(
tx: Prisma.TransactionClient,
events: OutboxEventData[],
): Promise<void> {
if (events.length === 0) return;
this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox (in transaction)`);
await tx.outboxEvent.createMany({
data: events.map((event) => ({
eventType: event.eventType,
topic: event.topic,
key: event.key,
payload: event.payload as Prisma.JsonObject,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
status: OutboxStatus.PENDING,
})),
});
this.logger.debug(`[OUTBOX] Saved ${events.length} events to outbox`);
}
/**
* Outbox
*/
async saveEvents(events: OutboxEventData[]): Promise<void> {
if (events.length === 0) return;
this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox`);
await this.prisma.outboxEvent.createMany({
data: events.map((event) => ({
eventType: event.eventType,
topic: event.topic,
key: event.key,
payload: event.payload as Prisma.JsonObject,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
status: OutboxStatus.PENDING,
})),
});
this.logger.log(`[OUTBOX] ✓ Saved ${events.length} events to outbox`);
}
/**
*
*/
async findPendingEvents(limit: number = 100): Promise<OutboxEvent[]> {
const events = await this.prisma.outboxEvent.findMany({
where: {
status: OutboxStatus.PENDING,
},
orderBy: {
createdAt: 'asc',
},
take: limit,
});
return events.map((e) => this.mapToOutboxEvent(e));
}
/**
*
*/
async findEventsForRetry(limit: number = 50): Promise<OutboxEvent[]> {
const now = new Date();
const events = await this.prisma.outboxEvent.findMany({
where: {
status: OutboxStatus.FAILED,
retryCount: {
lt: this.prisma.outboxEvent.fields.maxRetries,
},
nextRetryAt: {
lte: now,
},
},
orderBy: {
nextRetryAt: 'asc',
},
take: limit,
});
return events.map((e) => this.mapToOutboxEvent(e));
}
/**
*
* B方案 Kafka SENT CONFIRMED
*/
async markAsSent(id: bigint): Promise<void> {
await this.prisma.outboxEvent.update({
where: { id },
data: {
status: OutboxStatus.SENT,
publishedAt: new Date(),
},
});
this.logger.debug(`[OUTBOX] Marked event ${id} as SENT (awaiting consumer confirmation)`);
}
/**
*
* B方案
*/
async markAsConfirmed(eventId: string): Promise<boolean> {
// 通过 aggregateId + eventType 查找事件
const result = await this.prisma.outboxEvent.updateMany({
where: {
aggregateId: eventId,
status: OutboxStatus.SENT,
},
data: {
status: OutboxStatus.CONFIRMED,
},
});
if (result.count > 0) {
this.logger.log(`[OUTBOX] ✓ Event ${eventId} confirmed by consumer`);
return true;
}
this.logger.warn(`[OUTBOX] Event ${eventId} not found or not in SENT status`);
return false;
}
/**
* outbox ID
*/
async markAsConfirmedById(id: bigint): Promise<void> {
await this.prisma.outboxEvent.update({
where: { id },
data: {
status: OutboxStatus.CONFIRMED,
},
});
this.logger.log(`[OUTBOX] ✓ Event ${id} confirmed by consumer`);
}
/**
*
* B方案
*/
async findSentEventsTimedOut(timeoutMinutes: number = 5, limit: number = 50): Promise<OutboxEvent[]> {
const cutoffTime = new Date();
cutoffTime.setMinutes(cutoffTime.getMinutes() - timeoutMinutes);
const events = await this.prisma.outboxEvent.findMany({
where: {
status: OutboxStatus.SENT,
publishedAt: {
lt: cutoffTime,
},
retryCount: {
lt: 5, // 最多重试5次
},
},
orderBy: {
publishedAt: 'asc',
},
take: limit,
});
return events.map((e) => this.mapToOutboxEvent(e));
}
/**
* SENT PENDING 便
*/
async resetSentToPending(id: bigint): Promise<void> {
const event = await this.prisma.outboxEvent.findUnique({
where: { id },
});
if (!event) return;
await this.prisma.outboxEvent.update({
where: { id },
data: {
status: OutboxStatus.PENDING,
retryCount: event.retryCount + 1,
lastError: 'Consumer confirmation timeout',
},
});
this.logger.warn(`[OUTBOX] Event ${id} reset to PENDING for retry (confirmation timeout)`);
}
/**
*
* @deprecated 使 markAsSent markAsConfirmed
*/
async markAsPublished(id: bigint): Promise<void> {
await this.prisma.outboxEvent.update({
where: { id },
data: {
status: OutboxStatus.CONFIRMED,
publishedAt: new Date(),
},
});
this.logger.debug(`[OUTBOX] Marked event ${id} as published (confirmed)`);
}
/**
*
*/
async markAsFailed(id: bigint, error: string): Promise<void> {
const event = await this.prisma.outboxEvent.findUnique({
where: { id },
});
if (!event) return;
const newRetryCount = event.retryCount + 1;
const isFinalFailure = newRetryCount >= event.maxRetries;
// 指数退避1min, 2min, 4min, 8min, 16min
const delayMinutes = Math.pow(2, newRetryCount - 1);
const nextRetryAt = new Date();
nextRetryAt.setMinutes(nextRetryAt.getMinutes() + delayMinutes);
await this.prisma.outboxEvent.update({
where: { id },
data: {
status: isFinalFailure ? OutboxStatus.FAILED : OutboxStatus.FAILED,
retryCount: newRetryCount,
lastError: error,
nextRetryAt: isFinalFailure ? null : nextRetryAt,
},
});
if (isFinalFailure) {
this.logger.error(`[OUTBOX] Event ${id} permanently failed after ${newRetryCount} retries: ${error}`);
} else {
this.logger.warn(`[OUTBOX] Event ${id} failed, retry ${newRetryCount}/${event.maxRetries}, next retry at ${nextRetryAt}`);
}
}
/**
*
* @deprecated 使 markAsConfirmedById
*/
async markManyAsPublished(ids: bigint[]): Promise<void> {
if (ids.length === 0) return;
await this.prisma.outboxEvent.updateMany({
where: {
id: { in: ids },
},
data: {
status: OutboxStatus.CONFIRMED,
publishedAt: new Date(),
},
});
this.logger.debug(`[OUTBOX] Marked ${ids.length} events as confirmed`);
}
/**
* 7
*/
async cleanupOldEvents(retentionDays: number = 7): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);
const result = await this.prisma.outboxEvent.deleteMany({
where: {
status: OutboxStatus.CONFIRMED,
publishedAt: {
lt: cutoffDate,
},
},
});
if (result.count > 0) {
this.logger.log(`[OUTBOX] Cleaned up ${result.count} old events`);
}
return result.count;
}
/**
* Outbox
*/
async getStats(): Promise<{
pending: number;
sent: number;
confirmed: number;
failed: number;
}> {
const [pending, sent, confirmed, failed] = await Promise.all([
this.prisma.outboxEvent.count({ where: { status: OutboxStatus.PENDING } }),
this.prisma.outboxEvent.count({ where: { status: OutboxStatus.SENT } }),
this.prisma.outboxEvent.count({ where: { status: OutboxStatus.CONFIRMED } }),
this.prisma.outboxEvent.count({ where: { status: OutboxStatus.FAILED } }),
]);
return { pending, sent, confirmed, failed };
}
private mapToOutboxEvent(record: any): OutboxEvent {
return {
id: record.id,
eventType: record.eventType,
topic: record.topic,
key: record.key,
payload: record.payload as Record<string, unknown>,
aggregateId: record.aggregateId,
aggregateType: record.aggregateType,
status: record.status as OutboxStatus,
retryCount: record.retryCount,
maxRetries: record.maxRetries,
lastError: record.lastError,
createdAt: record.createdAt,
publishedAt: record.publishedAt,
nextRetryAt: record.nextRetryAt,
};
}
}

View File

@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import { PrismaService, TransactionClient } from './prisma/prisma.service';
import { PlantingOrder } from '../../domain/aggregates/planting-order.aggregate';
@ -7,6 +7,7 @@ import { PoolInjectionBatch } from '../../domain/aggregates/pool-injection-batch
import { PlantingOrderMapper } from './mappers/planting-order.mapper';
import { PlantingPositionMapper } from './mappers/planting-position.mapper';
import { PoolInjectionBatchMapper } from './mappers/pool-injection-batch.mapper';
import { OutboxEventData } from './repositories/outbox.repository';
/**
* -
@ -45,10 +46,70 @@ export class UnitOfWork {
/**
* -
*
* Outbox Pattern
*
*/
export class TransactionalUnitOfWork {
private readonly logger = new Logger(TransactionalUnitOfWork.name);
private pendingOutboxEvents: OutboxEventData[] = [];
constructor(private readonly tx: TransactionClient) {}
/**
* Outbox
*
* @param event
*/
addOutboxEvent(event: OutboxEventData): void {
this.logger.debug(`[OUTBOX] Adding event to outbox: ${event.eventType} for ${event.aggregateType}:${event.aggregateId}`);
this.pendingOutboxEvents.push(event);
}
/**
* Outbox
*
* @param events
*/
addOutboxEvents(events: OutboxEventData[]): void {
this.logger.debug(`[OUTBOX] Adding ${events.length} events to outbox`);
this.pendingOutboxEvents.push(...events);
}
/**
* Outbox
*
*/
async commitOutboxEvents(): Promise<void> {
if (this.pendingOutboxEvents.length === 0) {
return;
}
this.logger.debug(`[OUTBOX] Committing ${this.pendingOutboxEvents.length} outbox events`);
await this.tx.outboxEvent.createMany({
data: this.pendingOutboxEvents.map((event) => ({
eventType: event.eventType,
topic: event.topic,
key: event.key,
payload: event.payload as Prisma.JsonObject,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
status: 'PENDING',
})),
});
this.logger.log(`[OUTBOX] ✓ Committed ${this.pendingOutboxEvents.length} outbox events`);
this.pendingOutboxEvents = [];
}
/**
*
*/
getPendingOutboxEventCount(): number {
return this.pendingOutboxEvents.length;
}
/**
*
*/

View File

@ -1,5 +1,6 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { KafkaService } from '../../infrastructure';
import { EventAckPublisher } from '../../infrastructure/kafka/event-ack.publisher';
import { TeamStatisticsService } from '../services';
import { UpdateTeamStatisticsCommand } from '../commands';
@ -11,6 +12,11 @@ interface PlantingCreatedEvent {
provinceCode: string;
cityCode: string;
};
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
/**
@ -24,6 +30,7 @@ export class PlantingCreatedHandler implements OnModuleInit {
constructor(
private readonly kafkaService: KafkaService,
private readonly teamStatisticsService: TeamStatisticsService,
private readonly eventAckPublisher: EventAckPublisher,
) {}
async onModuleInit() {
@ -42,6 +49,10 @@ export class PlantingCreatedHandler implements OnModuleInit {
return;
}
// B方案提取 outbox 信息用于发送确认
const outboxInfo = event._outbox;
const eventId = outboxInfo?.aggregateId || 'unknown';
try {
const command = new UpdateTeamStatisticsCommand(
BigInt(event.data.userId),
@ -54,11 +65,22 @@ export class PlantingCreatedHandler implements OnModuleInit {
this.logger.log(
`Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`,
);
// B方案发送处理成功确认
if (outboxInfo) {
await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType);
}
} catch (error) {
this.logger.error(
`Failed to update team statistics for user ${event.data.userId}:`,
error,
);
// B方案发送处理失败确认
if (outboxInfo) {
const errorMessage = error instanceof Error ? error.message : String(error);
await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage);
}
}
}
}

View File

@ -0,0 +1,98 @@
import { Injectable, Logger, Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
/**
*
*/
interface EventAckMessage {
/** 原始事件的 aggregateId如 orderNo */
eventId: string;
/** 原始事件类型 */
eventType: string;
/** 消费服务名称 */
consumerService: string;
/** 处理结果 */
success: boolean;
/** 错误信息(如果失败) */
error?: string;
/** 确认时间 */
confirmedAt: string;
}
/**
*
*
* B方案核心组件
* planting.events.ack topic
*/
@Injectable()
export class EventAckPublisher {
private readonly logger = new Logger(EventAckPublisher.name);
private readonly serviceName = 'referral-service';
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
) {}
/**
*
*/
async sendSuccess(eventId: string, eventType: string): Promise<void> {
const ackMessage: EventAckMessage = {
eventId,
eventType,
consumerService: this.serviceName,
success: true,
confirmedAt: new Date().toISOString(),
};
try {
this.kafkaClient.emit('planting.events.ack', {
key: eventId,
value: JSON.stringify(ackMessage),
});
this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`);
} catch (error) {
this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error);
}
}
/**
*
*/
async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise<void> {
const ackMessage: EventAckMessage = {
eventId,
eventType,
consumerService: this.serviceName,
success: false,
error: errorMessage,
confirmedAt: new Date().toISOString(),
};
try {
this.kafkaClient.emit('planting.events.ack', {
key: eventId,
value: JSON.stringify(ackMessage),
});
this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`);
} catch (error) {
this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error);
}
}
/**
* outbox
*/
extractOutboxInfo(message: Record<string, unknown>): { id: string; aggregateId: string; eventType: string } | null {
const outbox = message._outbox as { id: string; aggregateId: string; eventType: string } | undefined;
if (!outbox) {
this.logger.warn('[ACK] Message does not contain _outbox metadata');
return null;
}
return outbox;
}
}

View File

@ -0,0 +1,117 @@
-- CreateTable: reward_ledger_entries (聚合根1 - 行为表, append-only)
CREATE TABLE "reward_ledger_entries" (
"entry_id" BIGSERIAL NOT NULL,
"user_id" BIGINT NOT NULL,
"source_order_id" BIGINT NOT NULL,
"source_user_id" BIGINT NOT NULL,
"right_type" VARCHAR(50) NOT NULL,
"usdt_amount" DECIMAL(20,8) NOT NULL,
"hashpower_amount" DECIMAL(20,8) NOT NULL DEFAULT 0,
"reward_status" VARCHAR(20) NOT NULL DEFAULT 'PENDING',
"created_at" TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"expire_at" TIMESTAMP(3),
"claimed_at" TIMESTAMP(3),
"settled_at" TIMESTAMP(3),
"expired_at" TIMESTAMP(3),
"memo" VARCHAR(500),
CONSTRAINT "reward_ledger_entries_pkey" PRIMARY KEY ("entry_id")
);
-- CreateTable: reward_summaries (聚合根2 - 状态表)
CREATE TABLE "reward_summaries" (
"summary_id" BIGSERIAL NOT NULL,
"user_id" BIGINT NOT NULL,
"pending_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0,
"pending_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0,
"pending_expire_at" TIMESTAMP(3),
"settleable_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0,
"settleable_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0,
"settled_total_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0,
"settled_total_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0,
"expired_total_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0,
"expired_total_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0,
"last_update_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "reward_summaries_pkey" PRIMARY KEY ("summary_id")
);
-- CreateTable: right_definitions (配置表)
CREATE TABLE "right_definitions" (
"definition_id" BIGSERIAL NOT NULL,
"right_type" VARCHAR(50) NOT NULL,
"usdt_per_tree" DECIMAL(20,8) NOT NULL,
"hashpower_percent" DECIMAL(5,2) NOT NULL DEFAULT 0,
"payable_to" VARCHAR(50) NOT NULL,
"rule_description" TEXT,
"is_enabled" BOOLEAN NOT NULL DEFAULT true,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "right_definitions_pkey" PRIMARY KEY ("definition_id")
);
-- CreateTable: settlement_records (行为表)
CREATE TABLE "settlement_records" (
"settlement_id" BIGSERIAL NOT NULL,
"user_id" BIGINT NOT NULL,
"usdt_amount" DECIMAL(20,8) NOT NULL,
"hashpower_amount" DECIMAL(20,8) NOT NULL,
"settle_currency" VARCHAR(10) NOT NULL,
"received_amount" DECIMAL(20,8) NOT NULL,
"swap_tx_hash" VARCHAR(100),
"swap_rate" DECIMAL(20,8),
"status" VARCHAR(20) NOT NULL DEFAULT 'PENDING',
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"completed_at" TIMESTAMP(3),
"reward_entry_ids" BIGINT[],
CONSTRAINT "settlement_records_pkey" PRIMARY KEY ("settlement_id")
);
-- CreateTable: reward_events (行为表, append-only)
CREATE TABLE "reward_events" (
"event_id" BIGSERIAL NOT NULL,
"event_type" VARCHAR(50) NOT NULL,
"aggregate_id" VARCHAR(100) NOT NULL,
"aggregate_type" VARCHAR(50) NOT NULL,
"event_data" JSONB NOT NULL,
"user_id" BIGINT,
"occurred_at" TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"version" INTEGER NOT NULL DEFAULT 1,
CONSTRAINT "reward_events_pkey" PRIMARY KEY ("event_id")
);
-- CreateIndex: reward_ledger_entries indexes
CREATE INDEX "idx_user_status" ON "reward_ledger_entries"("user_id", "reward_status");
CREATE INDEX "idx_user_created" ON "reward_ledger_entries"("user_id", "created_at" DESC);
CREATE INDEX "idx_source_order" ON "reward_ledger_entries"("source_order_id");
CREATE INDEX "idx_source_user" ON "reward_ledger_entries"("source_user_id");
CREATE INDEX "idx_right_type" ON "reward_ledger_entries"("right_type");
CREATE INDEX "idx_status" ON "reward_ledger_entries"("reward_status");
CREATE INDEX "idx_expire" ON "reward_ledger_entries"("expire_at");
CREATE INDEX "idx_created" ON "reward_ledger_entries"("created_at");
-- CreateIndex: reward_summaries indexes
CREATE UNIQUE INDEX "reward_summaries_user_id_key" ON "reward_summaries"("user_id");
CREATE INDEX "idx_summary_user" ON "reward_summaries"("user_id");
CREATE INDEX "idx_settleable_desc" ON "reward_summaries"("settleable_usdt" DESC);
CREATE INDEX "idx_pending_expire" ON "reward_summaries"("pending_expire_at");
-- CreateIndex: right_definitions indexes
CREATE UNIQUE INDEX "right_definitions_right_type_key" ON "right_definitions"("right_type");
CREATE INDEX "idx_def_right_type" ON "right_definitions"("right_type");
CREATE INDEX "idx_def_enabled" ON "right_definitions"("is_enabled");
-- CreateIndex: settlement_records indexes
CREATE INDEX "idx_settlement_user" ON "settlement_records"("user_id");
CREATE INDEX "idx_settlement_status" ON "settlement_records"("status");
CREATE INDEX "idx_settlement_created" ON "settlement_records"("created_at");
-- CreateIndex: reward_events indexes
CREATE INDEX "idx_reward_event_aggregate" ON "reward_events"("aggregate_type", "aggregate_id");
CREATE INDEX "idx_reward_event_type" ON "reward_events"("event_type");
CREATE INDEX "idx_reward_event_user" ON "reward_events"("user_id");
CREATE INDEX "idx_reward_event_occurred" ON "reward_events"("occurred_at");

View File

@ -0,0 +1,98 @@
import { Injectable, Logger, Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
/**
*
*/
interface EventAckMessage {
/** 原始事件的 aggregateId如 orderNo */
eventId: string;
/** 原始事件类型 */
eventType: string;
/** 消费服务名称 */
consumerService: string;
/** 处理结果 */
success: boolean;
/** 错误信息(如果失败) */
error?: string;
/** 确认时间 */
confirmedAt: string;
}
/**
*
*
* B方案核心组件
* planting.events.ack topic
*/
@Injectable()
export class EventAckPublisher {
private readonly logger = new Logger(EventAckPublisher.name);
private readonly serviceName = 'reward-service';
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
) {}
/**
*
*/
async sendSuccess(eventId: string, eventType: string): Promise<void> {
const ackMessage: EventAckMessage = {
eventId,
eventType,
consumerService: this.serviceName,
success: true,
confirmedAt: new Date().toISOString(),
};
try {
this.kafkaClient.emit('planting.events.ack', {
key: eventId,
value: JSON.stringify(ackMessage),
});
this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`);
} catch (error) {
this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error);
}
}
/**
*
*/
async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise<void> {
const ackMessage: EventAckMessage = {
eventId,
eventType,
consumerService: this.serviceName,
success: false,
error: errorMessage,
confirmedAt: new Date().toISOString(),
};
try {
this.kafkaClient.emit('planting.events.ack', {
key: eventId,
value: JSON.stringify(ackMessage),
});
this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`);
} catch (error) {
this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error);
}
}
/**
* outbox
*/
extractOutboxInfo(message: Record<string, unknown>): { id: string; aggregateId: string; eventType: string } | null {
const outbox = message._outbox as { id: string; aggregateId: string; eventType: string } | undefined;
if (!outbox) {
this.logger.warn('[ACK] Message does not contain _outbox metadata');
return null;
}
return outbox;
}
}

View File

@ -1,14 +1,31 @@
import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { RewardApplicationService } from '../../application/services/reward-application.service';
import { EventAckPublisher } from './event-ack.publisher';
interface PlantingOrderPaidEvent {
orderId: string;
userId: string;
treeCount: number;
provinceCode: string;
cityCode: string;
paidAt: string;
eventName?: string;
data?: {
orderId: string;
userId: string;
treeCount: number;
provinceCode: string;
cityCode: string;
paidAt: string;
};
// 兼容旧格式
orderId?: string;
userId?: string;
treeCount?: number;
provinceCode?: string;
cityCode?: string;
paidAt?: string;
// B方案outbox 元数据
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
@Controller()
@ -17,6 +34,7 @@ export class EventConsumerController {
constructor(
private readonly rewardService: RewardApplicationService,
private readonly eventAckPublisher: EventAckPublisher,
) {}
/**
@ -26,22 +44,48 @@ export class EventConsumerController {
async handlePlantingOrderPaid(@Payload() message: PlantingOrderPaidEvent) {
this.logger.log(`Received planting.order.paid event: ${JSON.stringify(message)}`);
// 解析消息数据(支持新旧格式)
const eventData = message.data || {
orderId: message.orderId!,
userId: message.userId!,
treeCount: message.treeCount!,
provinceCode: message.provinceCode!,
cityCode: message.cityCode!,
paidAt: message.paidAt!,
};
// B方案提取 outbox 信息用于发送确认
const outboxInfo = message._outbox;
const eventId = outboxInfo?.aggregateId || eventData.orderId;
try {
// 1. 计算并分配奖励
await this.rewardService.distributeRewards({
sourceOrderId: BigInt(message.orderId),
sourceUserId: BigInt(message.userId),
treeCount: message.treeCount,
provinceCode: message.provinceCode,
cityCode: message.cityCode,
sourceOrderId: BigInt(eventData.orderId),
sourceUserId: BigInt(eventData.userId),
treeCount: eventData.treeCount,
provinceCode: eventData.provinceCode,
cityCode: eventData.cityCode,
});
// 2. 检查该用户是否有待领取奖励需要转为可结算
await this.rewardService.claimPendingRewardsForUser(BigInt(message.userId));
await this.rewardService.claimPendingRewardsForUser(BigInt(eventData.userId));
this.logger.log(`Successfully processed planting.order.paid for order ${message.orderId}`);
this.logger.log(`Successfully processed planting.order.paid for order ${eventData.orderId}`);
// B方案发送处理成功确认
if (outboxInfo) {
await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType);
}
} catch (error) {
this.logger.error(`Error processing planting.order.paid:`, error);
// B方案发送处理失败确认
if (outboxInfo) {
const errorMessage = error instanceof Error ? error.message : String(error);
await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage);
}
throw error;
}
}