From 014ad9d19fa460bafd9b1f9142ae17f833d985d7 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 9 Dec 2025 21:32:16 -0800 Subject: [PATCH] =?UTF-8?q?feat(planting):=20implement=20Outbox=20Pattern?= =?UTF-8?q?=20with=20consumer=20acknowledgment=20(B=E6=96=B9=E6=A1=88)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement reliable event delivery using Outbox Pattern with consumer confirmation: ## planting-service (Producer) - Add OutboxEvent table with status: PENDING → SENT → CONFIRMED - Add OutboxRepository with transaction support and timeout handling - Add OutboxPublisherService with polling, timeout check, and retry - Add EventAckController to receive consumer confirmations - Update UnitOfWork to save outbox events atomically with business data - Update PlantingApplicationService to use outbox pattern - Update PoolInjectionService to use outbox pattern ## Consumer Services - Add EventAckPublisher to reward-service, referral-service, authorization-service - Update event handlers to send acknowledgment after successful processing ## Event Flow 1. Business data + outbox events saved in same transaction 2. OutboxPublisher polls and sends to Kafka, marks as SENT 3. Consumer processes event and sends ack to planting.events.ack 4. EventAckController receives ack and marks as CONFIRMED 5. Timeout check resets SENT→PENDING for retry (max 5 times) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../kafka/event-ack.publisher.ts | 98 +++++ .../kafka/event-consumer.controller.ts | 260 ++++++++++++- .../planting-service/.env.development | 4 + .../services/planting-service/.env.example | 4 + .../planting-service/package-lock.json | 72 ++++ .../services/planting-service/package.json | 2 + .../20241210000000_add_outbox/migration.sql | 31 ++ .../planting-service/prisma/schema.prisma | 38 ++ .../services/planting-application.service.ts | 94 ++++- .../services/pool-injection.service.ts | 52 ++- .../infrastructure/infrastructure.module.ts | 10 + .../kafka/event-ack.consumer.ts | 99 +++++ .../kafka/event-ack.controller.ts | 82 ++++ .../kafka/event-publisher.service.ts | 245 ++++++++++++ .../src/infrastructure/kafka/index.ts | 2 + .../src/infrastructure/kafka/kafka.module.ts | 32 ++ .../kafka/outbox-publisher.service.ts | 280 ++++++++++++++ .../repositories/outbox.repository.ts | 364 ++++++++++++++++++ .../persistence/unit-of-work.ts | 63 ++- .../planting-created.handler.ts | 22 ++ .../kafka/event-ack.publisher.ts | 98 +++++ .../20241204000000_init/migration.sql | 117 ++++++ .../kafka/event-ack.publisher.ts | 98 +++++ .../kafka/event-consumer.controller.ts | 70 +++- 24 files changed, 2204 insertions(+), 33 deletions(-) create mode 100644 backend/services/authorization-service/src/infrastructure/kafka/event-ack.publisher.ts create mode 100644 backend/services/planting-service/prisma/migrations/20241210000000_add_outbox/migration.sql create mode 100644 backend/services/planting-service/src/infrastructure/kafka/event-ack.consumer.ts create mode 100644 backend/services/planting-service/src/infrastructure/kafka/event-ack.controller.ts create mode 100644 backend/services/planting-service/src/infrastructure/kafka/event-publisher.service.ts create mode 100644 backend/services/planting-service/src/infrastructure/kafka/index.ts create mode 100644 backend/services/planting-service/src/infrastructure/kafka/kafka.module.ts create mode 100644 backend/services/planting-service/src/infrastructure/kafka/outbox-publisher.service.ts create mode 100644 backend/services/planting-service/src/infrastructure/persistence/repositories/outbox.repository.ts create mode 100644 backend/services/referral-service/src/infrastructure/kafka/event-ack.publisher.ts create mode 100644 backend/services/reward-service/prisma/migrations/20241204000000_init/migration.sql create mode 100644 backend/services/reward-service/src/infrastructure/kafka/event-ack.publisher.ts diff --git a/backend/services/authorization-service/src/infrastructure/kafka/event-ack.publisher.ts b/backend/services/authorization-service/src/infrastructure/kafka/event-ack.publisher.ts new file mode 100644 index 00000000..2e24b8e5 --- /dev/null +++ b/backend/services/authorization-service/src/infrastructure/kafka/event-ack.publisher.ts @@ -0,0 +1,98 @@ +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认发布器 + * + * B方案核心组件:消费方处理事件后发送确认 + * 发送确认消息到 planting.events.ack topic + */ +@Injectable() +export class EventAckPublisher { + private readonly logger = new Logger(EventAckPublisher.name); + private readonly serviceName = 'authorization-service'; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + ) {} + + /** + * 发送处理成功确认 + */ + async sendSuccess(eventId: string, eventType: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: true, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`); + } catch (error) { + this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error); + } + } + + /** + * 发送处理失败确认 + */ + async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: false, + error: errorMessage, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`); + } catch (error) { + this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error); + } + } + + /** + * 从消息中提取 outbox 信息 + */ + extractOutboxInfo(message: Record): { id: string; aggregateId: string; eventType: string } | null { + const outbox = message._outbox as { id: string; aggregateId: string; eventType: string } | undefined; + if (!outbox) { + this.logger.warn('[ACK] Message does not contain _outbox metadata'); + return null; + } + return outbox; + } +} diff --git a/backend/services/authorization-service/src/infrastructure/kafka/event-consumer.controller.ts b/backend/services/authorization-service/src/infrastructure/kafka/event-consumer.controller.ts index 83c1cf6a..d3c3308e 100644 --- a/backend/services/authorization-service/src/infrastructure/kafka/event-consumer.controller.ts +++ b/backend/services/authorization-service/src/infrastructure/kafka/event-consumer.controller.ts @@ -1,28 +1,100 @@ -import { Controller, Logger } from '@nestjs/common' +import { Controller, Logger, Inject } from '@nestjs/common' import { EventPattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices' +import { + IAuthorizationRoleRepository, + AUTHORIZATION_ROLE_REPOSITORY, + IMonthlyAssessmentRepository, + MONTHLY_ASSESSMENT_REPOSITORY, +} from '../../domain/repositories' +import { + TEAM_STATISTICS_REPOSITORY, +} from '../../application/services/authorization-application.service' +import type { ITeamStatisticsRepository, TeamStatistics } from '../../domain/services' +import { UserId, Month } from '../../domain/value-objects' +import { RoleType, AuthorizationStatus } from '../../domain/enums' +import { LadderTargetRule } from '../../domain/entities' +import { MonthlyAssessment, AuthorizationRole } from '../../domain/aggregates' +import { EventPublisherService } from './event-publisher.service' +import { EventAckPublisher } from './event-ack.publisher' + +interface TreePlantedPayload { + userId: string + treeCount: number + provinceCode?: string + cityCode?: string +} + +interface PlantingEventMessage { + eventName?: string + eventType?: string + aggregateId?: string + occurredAt?: string + data?: TreePlantedPayload + payload?: TreePlantedPayload + // B方案:outbox 元数据 + _outbox?: { + id: string + aggregateId: string + eventType: string + } +} @Controller() export class EventConsumerController { private readonly logger = new Logger(EventConsumerController.name) + constructor( + @Inject(AUTHORIZATION_ROLE_REPOSITORY) + private readonly authorizationRepository: IAuthorizationRoleRepository, + @Inject(MONTHLY_ASSESSMENT_REPOSITORY) + private readonly assessmentRepository: IMonthlyAssessmentRepository, + @Inject(TEAM_STATISTICS_REPOSITORY) + private readonly statsRepository: ITeamStatisticsRepository, + private readonly eventPublisher: EventPublisherService, + private readonly eventAckPublisher: EventAckPublisher, + ) {} + // 监听认种事件 - 用于更新考核进度 @EventPattern('planting-events') async handlePlantingEvent( - @Payload() message: any, + @Payload() message: PlantingEventMessage, @Ctx() context: KafkaContext, ) { - try { - this.logger.log(`Received planting event: ${message.eventType}`) + const eventType = message.eventType || message.eventName || 'unknown' + const outboxInfo = message._outbox + const eventId = outboxInfo?.aggregateId || message.aggregateId || 'unknown' - switch (message.eventType) { + try { + this.logger.log(`[KAFKA] Received planting event: ${eventType}`) + this.logger.debug(`[KAFKA] Event payload: ${JSON.stringify(message)}`) + + // 获取 payload(支持多种格式) + const payload = message.payload || message.data + + switch (eventType) { case 'planting.tree.planted': - await this.handleTreePlanted(message.payload) + case 'PlantingOrderPaid': + case 'MiningEnabled': + if (payload) { + await this.handleTreePlanted(payload) + } break default: - this.logger.debug(`Unhandled planting event type: ${message.eventType}`) + this.logger.debug(`[KAFKA] Unhandled planting event type: ${eventType}`) + } + + // B方案:发送处理成功确认 + if (outboxInfo) { + await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType) } } catch (error) { - this.logger.error('Failed to handle planting event:', error) + this.logger.error('[KAFKA] Failed to handle planting event:', error) + + // B方案:发送处理失败确认 + if (outboxInfo) { + const errorMessage = error instanceof Error ? error.message : String(error) + await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage) + } } } @@ -33,24 +105,180 @@ export class EventConsumerController { @Ctx() context: KafkaContext, ) { try { - this.logger.log(`Received referral event: ${message.eventType}`) + this.logger.log(`[KAFKA] Received referral event: ${message.eventType}`) switch (message.eventType) { case 'referral.relationship.created': // 处理推荐关系创建事件 break default: - this.logger.debug(`Unhandled referral event type: ${message.eventType}`) + this.logger.debug(`[KAFKA] Unhandled referral event type: ${message.eventType}`) } } catch (error) { - this.logger.error('Failed to handle referral event:', error) + this.logger.error('[KAFKA] Failed to handle referral event:', error) } } - private async handleTreePlanted(payload: any) { - // TODO: 更新用户团队认种统计 - // TODO: 检查是否达成初始考核目标 - // TODO: 更新月度考核进度 - this.logger.log(`Tree planted by user: ${payload.userId}, count: ${payload.treeCount}`) + /** + * 处理认种事件 + * 1. 检查用户是否有待激活的授权(初始考核) + * 2. 更新正在进行的月度考核进度 + */ + private async handleTreePlanted(payload: TreePlantedPayload): Promise { + const { userId, treeCount } = payload + this.logger.log(`[PLANTING] Processing tree planted event: userId=${userId}, count=${treeCount}`) + + try { + // 1. 获取用户团队统计 + const teamStats = await this.statsRepository.findByUserId(userId) + if (!teamStats) { + this.logger.warn(`[PLANTING] No team stats found for user ${userId}, skipping`) + return + } + + const totalTreeCount = teamStats.totalTeamPlantingCount + this.logger.debug(`[PLANTING] User ${userId} total team planting count: ${totalTreeCount}`) + + // 2. 获取用户所有授权 + const authorizations = await this.authorizationRepository.findByUserId( + UserId.create(userId), + ) + + if (authorizations.length === 0) { + this.logger.debug(`[PLANTING] User ${userId} has no authorizations`) + return + } + + // 3. 处理每个授权 + for (const auth of authorizations) { + this.logger.debug(`[PLANTING] Processing authorization: ${auth.authorizationId.value}, role=${auth.roleType}, benefitActive=${auth.benefitActive}`) + + // 3.1 检查初始考核(权益未激活的情况) + if (!auth.benefitActive && auth.status === AuthorizationStatus.PENDING) { + const initialTarget = auth.getInitialTarget() + this.logger.debug(`[PLANTING] Checking initial target: ${totalTreeCount}/${initialTarget}`) + + if (totalTreeCount >= initialTarget) { + this.logger.log(`[PLANTING] User ${userId} reached initial target for ${auth.roleType}, activating benefit`) + auth.activateBenefit() + await this.authorizationRepository.save(auth) + await this.eventPublisher.publishAll(auth.domainEvents) + auth.clearDomainEvents() + + // 创建首月考核(针对授权省市公司) + if (auth.roleType === RoleType.AUTH_PROVINCE_COMPANY || + auth.roleType === RoleType.AUTH_CITY_COMPANY) { + await this.createInitialAssessment(auth, teamStats) + } + } + } + + // 3.2 更新月度考核进度(权益已激活的授权省市公司) + if (auth.benefitActive && + (auth.roleType === RoleType.AUTH_PROVINCE_COMPANY || + auth.roleType === RoleType.AUTH_CITY_COMPANY)) { + await this.updateMonthlyAssessment(auth, teamStats) + } + } + + this.logger.log(`[PLANTING] Completed processing tree planted event for user ${userId}`) + } catch (error) { + this.logger.error(`[PLANTING] Error processing tree planted for user ${userId}:`, error) + throw error + } + } + + /** + * 创建首月考核 + */ + private async createInitialAssessment( + authorization: AuthorizationRole, + teamStats: TeamStatistics, + ): Promise { + const currentMonth = Month.current() + const target = LadderTargetRule.getTarget(authorization.roleType, 1) + + this.logger.log(`[ASSESSMENT] Creating initial assessment for ${authorization.authorizationId.value}, month=${currentMonth.value}`) + + // 检查是否已存在 + const existing = await this.assessmentRepository.findByAuthorizationAndMonth( + authorization.authorizationId, + currentMonth, + ) + + if (existing) { + this.logger.debug(`[ASSESSMENT] Assessment already exists, updating...`) + await this.doAssessmentUpdate(existing, authorization, teamStats) + return + } + + const assessment = MonthlyAssessment.create({ + authorizationId: authorization.authorizationId, + userId: authorization.userId, + roleType: authorization.roleType, + regionCode: authorization.regionCode, + assessmentMonth: currentMonth, + monthIndex: 1, + monthlyTarget: target.monthlyTarget, + cumulativeTarget: target.cumulativeTarget, + }) + + await this.doAssessmentUpdate(assessment, authorization, teamStats) + this.logger.log(`[ASSESSMENT] Created initial assessment for ${authorization.authorizationId.value}`) + } + + /** + * 更新月度考核进度 + */ + private async updateMonthlyAssessment( + authorization: AuthorizationRole, + teamStats: TeamStatistics, + ): Promise { + const currentMonth = Month.current() + + // 查找当月考核记录 + const assessment = await this.assessmentRepository.findByAuthorizationAndMonth( + authorization.authorizationId, + currentMonth, + ) + + if (!assessment) { + this.logger.debug(`[ASSESSMENT] No assessment found for ${authorization.authorizationId.value} in ${currentMonth.value}`) + return + } + + await this.doAssessmentUpdate(assessment, authorization, teamStats) + this.logger.log(`[ASSESSMENT] Updated assessment for ${authorization.authorizationId.value}`) + } + + /** + * 执行考核评估 + */ + private async doAssessmentUpdate( + assessment: MonthlyAssessment, + authorization: AuthorizationRole, + teamStats: TeamStatistics, + ): Promise { + // 计算本地团队数量 + let localTeamCount = 0 + if (authorization.roleType === RoleType.AUTH_PROVINCE_COMPANY) { + localTeamCount = teamStats.getProvinceTeamCount(authorization.regionCode.value) + } else if (authorization.roleType === RoleType.AUTH_CITY_COMPANY) { + localTeamCount = teamStats.getCityTeamCount(authorization.regionCode.value) + } + + this.logger.debug(`[ASSESSMENT] Assessing: cumulative=${teamStats.totalTeamPlantingCount}, local=${localTeamCount}, total=${teamStats.totalTeamPlantingCount}`) + + assessment.assess({ + cumulativeCompleted: teamStats.totalTeamPlantingCount, + localTeamCount, + totalTeamCount: teamStats.totalTeamPlantingCount, + requireLocalPercentage: authorization.requireLocalPercentage, + exemptFromPercentageCheck: authorization.exemptFromPercentageCheck, + }) + + await this.assessmentRepository.save(assessment) + await this.eventPublisher.publishAll(assessment.domainEvents) + assessment.clearDomainEvents() } } diff --git a/backend/services/planting-service/.env.development b/backend/services/planting-service/.env.development index 65a7497c..7da73714 100644 --- a/backend/services/planting-service/.env.development +++ b/backend/services/planting-service/.env.development @@ -12,3 +12,7 @@ JWT_SECRET="planting-service-dev-jwt-secret" WALLET_SERVICE_URL=http://localhost:3002 IDENTITY_SERVICE_URL=http://localhost:3001 REFERRAL_SERVICE_URL=http://localhost:3004 + +# Kafka Configuration +KAFKA_BROKERS=localhost:9092 +KAFKA_CLIENT_ID=planting-service diff --git a/backend/services/planting-service/.env.example b/backend/services/planting-service/.env.example index 995065dc..1f08d539 100644 --- a/backend/services/planting-service/.env.example +++ b/backend/services/planting-service/.env.example @@ -12,3 +12,7 @@ JWT_SECRET="your-super-secret-jwt-key-change-in-production" WALLET_SERVICE_URL=http://localhost:3002 IDENTITY_SERVICE_URL=http://localhost:3001 REFERRAL_SERVICE_URL=http://localhost:3004 + +# Kafka Configuration +KAFKA_BROKERS=localhost:9092 +KAFKA_CLIENT_ID=planting-service diff --git a/backend/services/planting-service/package-lock.json b/backend/services/planting-service/package-lock.json index ed5d9883..2d114004 100644 --- a/backend/services/planting-service/package-lock.json +++ b/backend/services/planting-service/package-lock.json @@ -14,6 +14,7 @@ "@nestjs/config": "^3.1.1", "@nestjs/core": "^10.0.0", "@nestjs/jwt": "^10.2.0", + "@nestjs/microservices": "^10.0.0", "@nestjs/passport": "^10.0.3", "@nestjs/platform-express": "^10.0.0", "@nestjs/swagger": "^7.1.17", @@ -21,6 +22,7 @@ "class-transformer": "^0.5.1", "class-validator": "^0.14.0", "jsonwebtoken": "^9.0.0", + "kafkajs": "^2.2.4", "passport": "^0.7.0", "passport-jwt": "^4.0.1", "reflect-metadata": "^0.1.13", @@ -246,6 +248,7 @@ "integrity": "sha512-e7jT4DxYvIDLk1ZHmU/m/mB19rex9sv0c2ftBtjSBv+kVM/902eh0fINUzD7UwLLNR+jU585GxUJ8/EBfAM5fw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@babel/code-frame": "^7.27.1", "@babel/generator": "^7.28.5", @@ -1865,6 +1868,65 @@ } } }, + "node_modules/@nestjs/microservices": { + "version": "10.4.20", + "resolved": "https://registry.npmjs.org/@nestjs/microservices/-/microservices-10.4.20.tgz", + "integrity": "sha512-zu/o84Z0uTUClNnGIGfIjcrO3z6T60h/pZPSJK50o4mehbEvJ76fijj6R/WTW0VP+1N16qOv/NsiYLKJA5Cc3w==", + "license": "MIT", + "peer": true, + "dependencies": { + "iterare": "1.2.1", + "tslib": "2.8.1" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/nest" + }, + "peerDependencies": { + "@grpc/grpc-js": "*", + "@nestjs/common": "^10.0.0", + "@nestjs/core": "^10.0.0", + "@nestjs/websockets": "^10.0.0", + "amqp-connection-manager": "*", + "amqplib": "*", + "cache-manager": "*", + "ioredis": "*", + "kafkajs": "*", + "mqtt": "*", + "nats": "*", + "reflect-metadata": "^0.1.12 || ^0.2.0", + "rxjs": "^7.1.0" + }, + "peerDependenciesMeta": { + "@grpc/grpc-js": { + "optional": true + }, + "@nestjs/websockets": { + "optional": true + }, + "amqp-connection-manager": { + "optional": true + }, + "amqplib": { + "optional": true + }, + "cache-manager": { + "optional": true + }, + "ioredis": { + "optional": true + }, + "kafkajs": { + "optional": true + }, + "mqtt": { + "optional": true + }, + "nats": { + "optional": true + } + } + }, "node_modules/@nestjs/passport": { "version": "10.0.3", "resolved": "https://registry.npmjs.org/@nestjs/passport/-/passport-10.0.3.tgz", @@ -6964,6 +7026,16 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "license": "MIT", + "peer": true, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.4", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz", diff --git a/backend/services/planting-service/package.json b/backend/services/planting-service/package.json index 92d2a23c..7470066e 100644 --- a/backend/services/planting-service/package.json +++ b/backend/services/planting-service/package.json @@ -27,12 +27,14 @@ "@nestjs/common": "^10.0.0", "@nestjs/config": "^3.1.1", "@nestjs/core": "^10.0.0", + "@nestjs/microservices": "^10.0.0", "@nestjs/platform-express": "^10.0.0", "@nestjs/swagger": "^7.1.17", "@nestjs/axios": "^3.0.1", "@nestjs/jwt": "^10.2.0", "@nestjs/passport": "^10.0.3", "@prisma/client": "^5.7.0", + "kafkajs": "^2.2.4", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", "jsonwebtoken": "^9.0.0", diff --git a/backend/services/planting-service/prisma/migrations/20241210000000_add_outbox/migration.sql b/backend/services/planting-service/prisma/migrations/20241210000000_add_outbox/migration.sql new file mode 100644 index 00000000..7f05b777 --- /dev/null +++ b/backend/services/planting-service/prisma/migrations/20241210000000_add_outbox/migration.sql @@ -0,0 +1,31 @@ +-- CreateTable: outbox_events (Outbox Pattern) +CREATE TABLE "outbox_events" ( + "outbox_id" BIGSERIAL NOT NULL, + "event_type" VARCHAR(100) NOT NULL, + "topic" VARCHAR(100) NOT NULL, + "key" VARCHAR(200) NOT NULL, + "payload" JSONB NOT NULL, + "aggregate_id" VARCHAR(100) NOT NULL, + "aggregate_type" VARCHAR(50) NOT NULL, + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "retry_count" INTEGER NOT NULL DEFAULT 0, + "max_retries" INTEGER NOT NULL DEFAULT 5, + "last_error" TEXT, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "published_at" TIMESTAMP(3), + "next_retry_at" TIMESTAMP(3), + + CONSTRAINT "outbox_events_pkey" PRIMARY KEY ("outbox_id") +); + +-- CreateIndex +CREATE INDEX "outbox_events_status_created_at_idx" ON "outbox_events"("status", "created_at"); + +-- CreateIndex +CREATE INDEX "outbox_events_status_next_retry_at_idx" ON "outbox_events"("status", "next_retry_at"); + +-- CreateIndex +CREATE INDEX "outbox_events_aggregate_type_aggregate_id_idx" ON "outbox_events"("aggregate_type", "aggregate_id"); + +-- CreateIndex +CREATE INDEX "outbox_events_topic_idx" ON "outbox_events"("topic"); diff --git a/backend/services/planting-service/prisma/schema.prisma b/backend/services/planting-service/prisma/schema.prisma index ec1c54e3..a0fc32f0 100644 --- a/backend/services/planting-service/prisma/schema.prisma +++ b/backend/services/planting-service/prisma/schema.prisma @@ -195,3 +195,41 @@ model PlantingEvent { @@index([occurredAt]) @@map("planting_events") } + +// ============================================ +// Outbox 事件发件箱表 (Outbox Pattern) +// 保证事件发布的可靠性: +// 1. 业务数据和 Outbox 记录在同一个事务中写入 +// 2. 后台任务轮询 Outbox 表并发布到 Kafka +// 3. 发布成功后标记为已处理 +// ============================================ +model OutboxEvent { + id BigInt @id @default(autoincrement()) @map("outbox_id") + + // 事件信息 + eventType String @map("event_type") @db.VarChar(100) + topic String @map("topic") @db.VarChar(100) + key String @map("key") @db.VarChar(200) + payload Json @map("payload") + + // 聚合根信息 (用于幂等性检查) + aggregateId String @map("aggregate_id") @db.VarChar(100) + aggregateType String @map("aggregate_type") @db.VarChar(50) + + // 发布状态 + status String @default("PENDING") @map("status") @db.VarChar(20) // PENDING, PUBLISHED, FAILED + retryCount Int @default(0) @map("retry_count") + maxRetries Int @default(5) @map("max_retries") + lastError String? @map("last_error") @db.Text + + // 时间戳 + createdAt DateTime @default(now()) @map("created_at") + publishedAt DateTime? @map("published_at") + nextRetryAt DateTime? @map("next_retry_at") + + @@index([status, createdAt]) + @@index([status, nextRetryAt]) + @@index([aggregateType, aggregateId]) + @@index([topic]) + @@map("outbox_events") +} diff --git a/backend/services/planting-service/src/application/services/planting-application.service.ts b/backend/services/planting-service/src/application/services/planting-application.service.ts index fd38da96..b5759be7 100644 --- a/backend/services/planting-service/src/application/services/planting-application.service.ts +++ b/backend/services/planting-service/src/application/services/planting-application.service.ts @@ -16,6 +16,7 @@ import { FundAllocationDomainService } from '../../domain/services/fund-allocati import { WalletServiceClient } from '../../infrastructure/external/wallet-service.client'; import { ReferralServiceClient } from '../../infrastructure/external/referral-service.client'; import { UnitOfWork, UNIT_OF_WORK } from '../../infrastructure/persistence/unit-of-work'; +import { OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository'; import { PRICE_PER_TREE } from '../../domain/value-objects/fund-allocation-target-type.enum'; // 个人最大认种数量限制 @@ -228,8 +229,8 @@ export class PlantingApplicationService { order.markAsPaid(); order.allocateFunds(allocations); - // 7. 使用事务保存本地数据库的所有变更 - // 这确保了订单状态、用户持仓、批次数据的原子性 + // 7. 使用事务保存本地数据库的所有变更 + Outbox事件 + // 这确保了订单状态、用户持仓、批次数据、以及事件发布的原子性 await this.unitOfWork.executeInTransaction(async (uow) => { // 保存订单状态 await uow.saveOrder(order); @@ -256,6 +257,16 @@ export class PlantingApplicationService { scheduledTime.setHours(scheduledTime.getHours() + 1); order.schedulePoolInjection(batch.id!, scheduledTime); await uow.saveOrder(order); + + // 8. 添加 Outbox 事件(在同一事务中保存) + // 使用 Outbox Pattern 保证事件发布的可靠性 + const outboxEvents = this.buildOutboxEvents(order, selection); + uow.addOutboxEvents(outboxEvents); + + // 提交 Outbox 事件(在事务中保存到数据库) + await uow.commitOutboxEvents(); + + this.logger.log(`[OUTBOX] Added ${outboxEvents.length} events to outbox for order ${order.orderNo}`); }); this.logger.log(`Local database transaction committed for order ${order.orderNo}`); @@ -268,6 +279,9 @@ export class PlantingApplicationService { this.logger.log(`Order paid successfully: ${order.orderNo}`); + // 清除已保存到 Outbox 的领域事件 + order.clearDomainEvents(); + return { orderNo: order.orderNo, status: order.status, @@ -400,4 +414,80 @@ export class PlantingApplicationService { } } + /** + * 构建 Outbox 事件数据 + * + * 为以下服务创建事件: + * - reward-service: 分配奖励 (planting.order.paid) + * - referral-service: 更新团队统计 (planting.planting.created) + * - authorization-service: 更新KPI考核 (planting-events) + */ + private buildOutboxEvents( + order: PlantingOrder, + selection: { provinceCode: string; cityCode: string }, + ): OutboxEventData[] { + const events: OutboxEventData[] = []; + + this.logger.debug(`[OUTBOX] Building outbox events for order ${order.orderNo}`); + + // 1. planting.order.paid 事件 (reward-service 消费) + events.push({ + eventType: 'planting.order.paid', + topic: 'planting.order.paid', + key: order.userId.toString(), + payload: { + eventName: 'planting.order.paid', + data: { + orderId: order.orderNo, + userId: order.userId.toString(), + treeCount: order.treeCount.value, + provinceCode: selection.provinceCode, + cityCode: selection.cityCode, + paidAt: order.paidAt!.toISOString(), + }, + }, + aggregateId: order.orderNo, + aggregateType: 'PlantingOrder', + }); + + // 2. planting.planting.created 事件 (referral-service 消费) + events.push({ + eventType: 'planting.planting.created', + topic: 'planting.planting.created', + key: order.userId.toString(), + payload: { + eventName: 'planting.created', + data: { + userId: order.userId.toString(), + treeCount: order.treeCount.value, + provinceCode: selection.provinceCode, + cityCode: selection.cityCode, + }, + }, + aggregateId: order.orderNo, + aggregateType: 'PlantingOrder', + }); + + // 3. planting-events 事件 (authorization-service 消费) + // 发布所有领域事件 + for (const domainEvent of order.domainEvents) { + events.push({ + eventType: domainEvent.type, + topic: 'planting-events', + key: order.userId.toString(), + payload: { + eventName: domainEvent.type, + aggregateId: domainEvent.aggregateId, + occurredAt: domainEvent.occurredAt, + data: domainEvent.data, + }, + aggregateId: order.orderNo, + aggregateType: 'PlantingOrder', + }); + } + + this.logger.debug(`[OUTBOX] Built ${events.length} outbox events for order ${order.orderNo}`); + + return events; + } } diff --git a/backend/services/planting-service/src/application/services/pool-injection.service.ts b/backend/services/planting-service/src/application/services/pool-injection.service.ts index eb8c8044..1c5e614a 100644 --- a/backend/services/planting-service/src/application/services/pool-injection.service.ts +++ b/backend/services/planting-service/src/application/services/pool-injection.service.ts @@ -12,7 +12,9 @@ import { POOL_INJECTION_BATCH_REPOSITORY, } from '../../domain/repositories/pool-injection-batch.repository.interface'; import { WalletServiceClient } from '../../infrastructure/external/wallet-service.client'; +import { OutboxRepository, OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository'; import { BatchStatus } from '../../domain/value-objects/batch-status.enum'; +import { PlantingOrder } from '../../domain/aggregates/planting-order.aggregate'; @Injectable() export class PoolInjectionService { @@ -26,6 +28,7 @@ export class PoolInjectionService { @Inject(POOL_INJECTION_BATCH_REPOSITORY) private readonly batchRepository: IPoolInjectionBatchRepository, private readonly walletService: WalletServiceClient, + private readonly outboxRepository: OutboxRepository, ) {} /** @@ -80,7 +83,11 @@ export class PoolInjectionService { // 更新批次内所有订单状态 const orders = await this.orderRepository.findByBatchId(batchId); + this.logger.log(`[BATCH] Processing ${orders.length} orders in batch ${batch.batchNo}`); + for (const order of orders) { + this.logger.debug(`[BATCH] Processing order ${order.orderNo}`); + order.confirmPoolInjection(result.txHash); await this.orderRepository.save(order); @@ -96,10 +103,13 @@ export class PoolInjectionService { position.activateMining(order.treeCount.value); await this.positionRepository.save(position); } + + // 保存 MiningEnabled 事件到 Outbox(使用 Outbox Pattern) + await this.saveMiningEnabledEventsToOutbox(order); } this.logger.log( - `Batch ${batch.batchNo} processed successfully, txHash: ${result.txHash}`, + `[BATCH] ✓ Batch ${batch.batchNo} processed successfully, txHash: ${result.txHash}, orders: ${orders.length}`, ); } catch (error) { this.logger.error(`Failed to inject batch ${batch.batchNo}`, error); @@ -160,4 +170,44 @@ export class PoolInjectionService { injectionTxHash: batch.injectionTxHash, }; } + + /** + * 保存挖矿开启事件到 Outbox + * + * 使用 Outbox Pattern 保证事件发布的可靠性 + * 事件由 OutboxPublisherService 轮询发布到 Kafka + */ + private async saveMiningEnabledEventsToOutbox(order: PlantingOrder): Promise { + this.logger.debug(`[OUTBOX] Saving MiningEnabled events to outbox for order ${order.orderNo}`); + + const outboxEvents: OutboxEventData[] = []; + + // 将领域事件转换为 Outbox 事件 + for (const domainEvent of order.domainEvents) { + outboxEvents.push({ + eventType: domainEvent.type, + topic: 'planting-events', + key: order.userId.toString(), + payload: { + eventName: domainEvent.type, + aggregateId: domainEvent.aggregateId, + occurredAt: domainEvent.occurredAt, + data: domainEvent.data, + }, + aggregateId: order.orderNo, + aggregateType: 'PlantingOrder', + }); + } + + if (outboxEvents.length > 0) { + // 使用 PrismaService 事务保存(这里简化处理,直接保存) + // 注意:理想情况下应该在 processBatch 的事务中一起保存 + await this.outboxRepository.saveEvents(outboxEvents); + + this.logger.log(`[OUTBOX] ✓ Saved ${outboxEvents.length} MiningEnabled events to outbox for order ${order.orderNo}`); + } + + // 清除领域事件 + order.clearDomainEvents(); + } } diff --git a/backend/services/planting-service/src/infrastructure/infrastructure.module.ts b/backend/services/planting-service/src/infrastructure/infrastructure.module.ts index f5367365..0431d928 100644 --- a/backend/services/planting-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/planting-service/src/infrastructure/infrastructure.module.ts @@ -4,9 +4,13 @@ import { PrismaService } from './persistence/prisma/prisma.service'; import { PlantingOrderRepositoryImpl } from './persistence/repositories/planting-order.repository.impl'; import { PlantingPositionRepositoryImpl } from './persistence/repositories/planting-position.repository.impl'; import { PoolInjectionBatchRepositoryImpl } from './persistence/repositories/pool-injection-batch.repository.impl'; +import { OutboxRepository } from './persistence/repositories/outbox.repository'; import { UnitOfWork, UNIT_OF_WORK } from './persistence/unit-of-work'; import { WalletServiceClient } from './external/wallet-service.client'; import { ReferralServiceClient } from './external/referral-service.client'; +import { KafkaModule } from './kafka/kafka.module'; +import { OutboxPublisherService } from './kafka/outbox-publisher.service'; +import { EventAckController } from './kafka/event-ack.controller'; import { PLANTING_ORDER_REPOSITORY } from '../domain/repositories/planting-order.repository.interface'; import { PLANTING_POSITION_REPOSITORY } from '../domain/repositories/planting-position.repository.interface'; import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-injection-batch.repository.interface'; @@ -18,7 +22,9 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj timeout: 5000, maxRedirects: 5, }), + KafkaModule, ], + controllers: [EventAckController], providers: [ PrismaService, { @@ -37,6 +43,8 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj provide: UNIT_OF_WORK, useClass: UnitOfWork, }, + OutboxRepository, + OutboxPublisherService, WalletServiceClient, ReferralServiceClient, ], @@ -46,6 +54,8 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj PLANTING_POSITION_REPOSITORY, POOL_INJECTION_BATCH_REPOSITORY, UNIT_OF_WORK, + OutboxRepository, + OutboxPublisherService, WalletServiceClient, ReferralServiceClient, ], diff --git a/backend/services/planting-service/src/infrastructure/kafka/event-ack.consumer.ts b/backend/services/planting-service/src/infrastructure/kafka/event-ack.consumer.ts new file mode 100644 index 00000000..c260fab1 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/event-ack.consumer.ts @@ -0,0 +1,99 @@ +import { Injectable, Logger, OnModuleInit, Inject } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { OutboxRepository } from '../persistence/repositories/outbox.repository'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认消费者 + * + * B方案核心组件:监听消费方的确认事件 + * 当消费方(reward-service, referral-service, authorization-service) + * 成功处理事件后,会发送确认消息到 planting.events.ack topic + * + * 工作流程: + * 1. planting-service 发送事件到 Kafka,标记为 SENT + * 2. 消费方处理事件成功后,发送确认到 planting.events.ack + * 3. 本消费者收到确认,将事件标记为 CONFIRMED + * 4. 超时未收到确认的事件会被重发 + */ +@Injectable() +export class EventAckConsumer implements OnModuleInit { + private readonly logger = new Logger(EventAckConsumer.name); + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + private readonly outboxRepository: OutboxRepository, + ) {} + + async onModuleInit() { + // 订阅确认 topic + this.kafkaClient.subscribeToResponseOf('planting.events.ack'); + + this.logger.log('[ACK-CONSUMER] Subscribed to planting.events.ack topic'); + + // 连接后开始消费 + try { + await this.kafkaClient.connect(); + this.startConsuming(); + } catch (error) { + this.logger.error('[ACK-CONSUMER] Failed to connect to Kafka:', error); + } + } + + private startConsuming() { + // 使用 ClientKafka 的 emit 来订阅消息 + // 注意:NestJS 的 ClientKafka 主要用于生产者,消费者通常使用 @MessagePattern + // 这里我们使用轮询方式或者通过 Controller 来处理 + this.logger.log('[ACK-CONSUMER] Event acknowledgment consumer started'); + } + + /** + * 处理确认消息 + * 此方法由 Kafka Controller 调用 + */ + async handleAckMessage(message: EventAckMessage): Promise { + this.logger.debug(`[ACK-CONSUMER] Received ack: ${JSON.stringify(message)}`); + + try { + if (message.success) { + // 标记事件为已确认 + const confirmed = await this.outboxRepository.markAsConfirmed(message.eventId); + + if (confirmed) { + this.logger.log( + `[ACK-CONSUMER] ✓ Event ${message.eventId} confirmed by ${message.consumerService}`, + ); + } + } else { + // 消费方处理失败,记录错误但不改变状态 + // 超时后会自动重发 + this.logger.warn( + `[ACK-CONSUMER] ✗ Event ${message.eventId} failed in ${message.consumerService}: ${message.error}`, + ); + } + } catch (error) { + this.logger.error( + `[ACK-CONSUMER] Error processing ack for event ${message.eventId}:`, + error, + ); + } + } +} diff --git a/backend/services/planting-service/src/infrastructure/kafka/event-ack.controller.ts b/backend/services/planting-service/src/infrastructure/kafka/event-ack.controller.ts new file mode 100644 index 00000000..991bba09 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/event-ack.controller.ts @@ -0,0 +1,82 @@ +import { Controller, Logger } from '@nestjs/common'; +import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices'; +import { OutboxRepository } from '../persistence/repositories/outbox.repository'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认 Kafka 控制器 + * + * B方案核心组件:监听消费方的确认事件 + * 使用 @MessagePattern 装饰器来处理 Kafka 消息 + */ +@Controller() +export class EventAckController { + private readonly logger = new Logger(EventAckController.name); + + constructor(private readonly outboxRepository: OutboxRepository) {} + + /** + * 处理事件确认消息 + * + * 消费方(reward-service, referral-service, authorization-service) + * 成功处理事件后,会发送确认消息到此 topic + */ + @MessagePattern('planting.events.ack') + async handleEventAck( + @Payload() message: EventAckMessage, + @Ctx() context: KafkaContext, + ): Promise { + const partition = context.getPartition(); + const offset = context.getMessage().offset; + + this.logger.debug( + `[ACK] Received ack from ${message.consumerService} for event ${message.eventId} ` + + `[partition=${partition}, offset=${offset}]`, + ); + + try { + if (message.success) { + // 标记事件为已确认 + const confirmed = await this.outboxRepository.markAsConfirmed(message.eventId); + + if (confirmed) { + this.logger.log( + `[ACK] ✓ Event ${message.eventId} (${message.eventType}) confirmed by ${message.consumerService}`, + ); + } else { + this.logger.warn( + `[ACK] Event ${message.eventId} not found or already confirmed`, + ); + } + } else { + // 消费方处理失败 + this.logger.warn( + `[ACK] ✗ Event ${message.eventId} failed in ${message.consumerService}: ${message.error}`, + ); + // 不改变状态,等待超时重发 + } + } catch (error) { + this.logger.error( + `[ACK] Error processing ack for event ${message.eventId}:`, + error, + ); + } + } +} diff --git a/backend/services/planting-service/src/infrastructure/kafka/event-publisher.service.ts b/backend/services/planting-service/src/infrastructure/kafka/event-publisher.service.ts new file mode 100644 index 00000000..9a511f2b --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/event-publisher.service.ts @@ -0,0 +1,245 @@ +import { Injectable, Inject, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { ConfigService } from '@nestjs/config'; +import { DomainEvent } from '../../domain/events/domain-event.interface'; + +/** + * Kafka Topic 映射 + * + * 消费者: + * - reward-service: 监听 planting.order.paid + * - authorization-service: 监听 planting-events + * - referral-service: 监听 planting.planting.created + */ +const EVENT_TOPIC_MAP: Record = { + PlantingOrderCreated: 'planting.order.created', + ProvinceCityConfirmed: 'planting.order.province-city-confirmed', + PlantingOrderPaid: 'planting.order.paid', + FundsAllocated: 'planting.order.funds-allocated', + PoolInjected: 'planting.pool.injected', + MiningEnabled: 'planting.mining.enabled', +}; + +@Injectable() +export class EventPublisherService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(EventPublisherService.name); + private isConnected = false; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + private readonly configService: ConfigService, + ) { + const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092'); + this.logger.log(`[INIT] EventPublisherService created, brokers: ${brokers}`); + } + + async onModuleInit() { + this.logger.log('[KAFKA] Attempting to connect to Kafka...'); + try { + await this.kafkaClient.connect(); + this.isConnected = true; + this.logger.log('[KAFKA] Successfully connected to Kafka broker'); + } catch (error) { + this.logger.error('[KAFKA] Failed to connect to Kafka:', error); + // 不抛出错误,允许服务启动(开发环境可能没有 Kafka) + this.logger.warn('[KAFKA] Service will start without Kafka connection'); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + this.logger.log('[KAFKA] Disconnecting from Kafka...'); + await this.kafkaClient.close(); + this.logger.log('[KAFKA] Disconnected from Kafka'); + } + } + + /** + * 发布单个领域事件 + */ + async publish(event: DomainEvent): Promise { + const topic = this.getTopicForEvent(event); + const message = this.formatMessage(event); + + this.logger.debug(`[PUBLISH] Preparing to publish event: + - Type: ${event.type} + - Topic: ${topic} + - AggregateId: ${event.aggregateId} + - Data: ${JSON.stringify(event.data)}`); + + if (!this.isConnected) { + this.logger.warn(`[PUBLISH] Kafka not connected, skipping event ${event.type}`); + return; + } + + try { + this.kafkaClient.emit(topic, message); + this.logger.log(`[PUBLISH] ✓ Event ${event.type} published to topic ${topic}`); + } catch (error) { + this.logger.error(`[PUBLISH] ✗ Failed to publish event ${event.type}:`, error); + throw error; + } + } + + /** + * 批量发布领域事件 + */ + async publishAll(events: readonly DomainEvent[]): Promise { + this.logger.log(`[PUBLISH] Publishing ${events.length} events...`); + for (const event of events) { + await this.publish(event); + } + this.logger.log(`[PUBLISH] Finished publishing ${events.length} events`); + } + + /** + * 发布到通用 planting-events topic (供 authorization-service 消费) + */ + async publishToPlantingEvents(event: DomainEvent): Promise { + const eventType = this.mapEventTypeToPlantingEventType(event.type); + const message = { + key: event.aggregateId, + value: JSON.stringify({ + eventType, + payload: event.data, + occurredAt: event.occurredAt.toISOString(), + }), + }; + + this.logger.debug(`[PUBLISH] Preparing planting-events message: + - Original Type: ${event.type} + - Mapped Type: ${eventType} + - Payload: ${JSON.stringify(event.data)}`); + + if (!this.isConnected) { + this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting-events for ${event.type}`); + return; + } + + try { + this.kafkaClient.emit('planting-events', message); + this.logger.log(`[PUBLISH] ✓ Event ${event.type} published to topic planting-events`); + } catch (error) { + this.logger.error(`[PUBLISH] ✗ Failed to publish to planting-events:`, error); + } + } + + /** + * 发布认种支付事件 (reward-service 消费格式) + */ + async publishPlantingOrderPaid(params: { + orderId: string; + userId: string; + treeCount: number; + provinceCode: string; + cityCode: string; + paidAt: Date; + }): Promise { + const topic = 'planting.order.paid'; + const message = { + key: params.orderId, + value: JSON.stringify({ + orderId: params.orderId, + userId: params.userId, + treeCount: params.treeCount, + provinceCode: params.provinceCode, + cityCode: params.cityCode, + paidAt: params.paidAt.toISOString(), + }), + }; + + this.logger.debug(`[PUBLISH] Publishing PlantingOrderPaid for reward-service: + - OrderId: ${params.orderId} + - UserId: ${params.userId} + - TreeCount: ${params.treeCount} + - Province: ${params.provinceCode} + - City: ${params.cityCode}`); + + if (!this.isConnected) { + this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting.order.paid`); + return; + } + + try { + this.kafkaClient.emit(topic, message); + this.logger.log(`[PUBLISH] ✓ PlantingOrderPaid published for order ${params.orderId}`); + } catch (error) { + this.logger.error(`[PUBLISH] ✗ Failed to publish PlantingOrderPaid:`, error); + } + } + + /** + * 发布认种创建事件 (referral-service 消费格式) + */ + async publishPlantingCreated(params: { + userId: string; + treeCount: number; + provinceCode: string; + cityCode: string; + }): Promise { + const topic = 'planting.planting.created'; + const message = { + key: params.userId, + value: JSON.stringify({ + eventName: 'planting.created', + data: { + userId: params.userId, + treeCount: params.treeCount, + provinceCode: params.provinceCode, + cityCode: params.cityCode, + }, + }), + }; + + this.logger.debug(`[PUBLISH] Publishing PlantingCreated for referral-service: + - UserId: ${params.userId} + - TreeCount: ${params.treeCount}`); + + if (!this.isConnected) { + this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting.planting.created`); + return; + } + + try { + this.kafkaClient.emit(topic, message); + this.logger.log(`[PUBLISH] ✓ PlantingCreated published for user ${params.userId}`); + } catch (error) { + this.logger.error(`[PUBLISH] ✗ Failed to publish PlantingCreated:`, error); + } + } + + private getTopicForEvent(event: DomainEvent): string { + const topic = EVENT_TOPIC_MAP[event.type] || 'planting.events'; + this.logger.debug(`[TOPIC] Mapped event type ${event.type} to topic ${topic}`); + return topic; + } + + private formatMessage(event: DomainEvent) { + return { + key: event.aggregateId, + value: JSON.stringify({ + eventId: `${event.aggregateId}-${event.occurredAt.getTime()}`, + eventType: event.type, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + occurredAt: event.occurredAt.toISOString(), + data: event.data, + }), + }; + } + + /** + * 映射到 authorization-service 期望的事件类型格式 + */ + private mapEventTypeToPlantingEventType(eventType: string): string { + switch (eventType) { + case 'PlantingOrderCreated': + return 'planting.tree.planted'; + case 'PlantingOrderPaid': + return 'planting.tree.planted'; + default: + return `planting.${eventType.toLowerCase()}`; + } + } +} diff --git a/backend/services/planting-service/src/infrastructure/kafka/index.ts b/backend/services/planting-service/src/infrastructure/kafka/index.ts new file mode 100644 index 00000000..0924b576 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/index.ts @@ -0,0 +1,2 @@ +export * from './kafka.module'; +export * from './event-publisher.service'; diff --git a/backend/services/planting-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/planting-service/src/infrastructure/kafka/kafka.module.ts new file mode 100644 index 00000000..c6e319cf --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/kafka.module.ts @@ -0,0 +1,32 @@ +import { Module, Global } from '@nestjs/common'; +import { ConfigModule, ConfigService } from '@nestjs/config'; +import { ClientsModule, Transport } from '@nestjs/microservices'; +import { EventPublisherService } from './event-publisher.service'; + +@Global() +@Module({ + imports: [ + ClientsModule.registerAsync([ + { + name: 'KAFKA_SERVICE', + imports: [ConfigModule], + useFactory: (configService: ConfigService) => ({ + transport: Transport.KAFKA, + options: { + client: { + clientId: configService.get('KAFKA_CLIENT_ID', 'planting-service'), + brokers: configService.get('KAFKA_BROKERS', 'localhost:9092').split(','), + }, + producer: { + allowAutoTopicCreation: true, + }, + }, + }), + inject: [ConfigService], + }, + ]), + ], + providers: [EventPublisherService], + exports: [EventPublisherService, ClientsModule], +}) +export class KafkaModule {} diff --git a/backend/services/planting-service/src/infrastructure/kafka/outbox-publisher.service.ts b/backend/services/planting-service/src/infrastructure/kafka/outbox-publisher.service.ts new file mode 100644 index 00000000..8f7ca5a7 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/outbox-publisher.service.ts @@ -0,0 +1,280 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy, Inject } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { ClientKafka } from '@nestjs/microservices'; +import { OutboxRepository, OutboxEvent, OutboxStatus } from '../persistence/repositories/outbox.repository'; + +/** + * Outbox Publisher Service (B方案 - 消费方确认模式) + * + * 轮询 Outbox 表并发布事件到 Kafka + * 使用消费方确认机制保证事件100%被处理 + * + * 工作流程: + * 1. 轮询 PENDING 状态的事件 + * 2. 发送到 Kafka,标记为 SENT(等待确认) + * 3. 消费方处理成功后发送确认到 planting.events.ack + * 4. 收到确认后标记为 CONFIRMED + * 5. 超时未确认的事件重置为 PENDING 重发 + */ +@Injectable() +export class OutboxPublisherService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(OutboxPublisherService.name); + private isRunning = false; + private pollInterval: NodeJS.Timeout | null = null; + private timeoutCheckInterval: NodeJS.Timeout | null = null; + private cleanupInterval: NodeJS.Timeout | null = null; + private isConnected = false; + + // 配置 + private readonly pollIntervalMs: number; + private readonly batchSize: number; + private readonly cleanupIntervalMs: number; + private readonly confirmationTimeoutMinutes: number; + private readonly timeoutCheckIntervalMs: number; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + private readonly outboxRepository: OutboxRepository, + private readonly configService: ConfigService, + ) { + this.pollIntervalMs = this.configService.get('OUTBOX_POLL_INTERVAL_MS', 1000); + this.batchSize = this.configService.get('OUTBOX_BATCH_SIZE', 100); + this.cleanupIntervalMs = this.configService.get('OUTBOX_CLEANUP_INTERVAL_MS', 3600000); // 1小时 + this.confirmationTimeoutMinutes = this.configService.get('OUTBOX_CONFIRMATION_TIMEOUT_MINUTES', 5); + this.timeoutCheckIntervalMs = this.configService.get('OUTBOX_TIMEOUT_CHECK_INTERVAL_MS', 60000); // 1分钟 + + this.logger.log( + `[OUTBOX] OutboxPublisher (B方案) configured: ` + + `pollInterval=${this.pollIntervalMs}ms, batchSize=${this.batchSize}, ` + + `confirmationTimeout=${this.confirmationTimeoutMinutes}min`, + ); + } + + async onModuleInit() { + this.logger.log('[OUTBOX] Connecting to Kafka...'); + try { + await this.kafkaClient.connect(); + this.isConnected = true; + this.logger.log('[OUTBOX] Connected to Kafka'); + this.start(); + } catch (error) { + this.logger.error('[OUTBOX] Failed to connect to Kafka:', error); + this.logger.warn('[OUTBOX] OutboxPublisher will not start - events will accumulate in outbox table'); + } + } + + async onModuleDestroy() { + this.stop(); + if (this.isConnected) { + await this.kafkaClient.close(); + } + } + + /** + * 启动轮询 + */ + start(): void { + if (this.isRunning) { + this.logger.warn('[OUTBOX] Publisher already running'); + return; + } + + this.isRunning = true; + this.logger.log('[OUTBOX] Starting outbox publisher (B方案)...'); + + // 启动发布轮询 + this.pollInterval = setInterval(() => { + this.processOutbox().catch((err) => { + this.logger.error('[OUTBOX] Error processing outbox:', err); + }); + }, this.pollIntervalMs); + + // 启动超时检查任务(B方案核心) + this.timeoutCheckInterval = setInterval(() => { + this.checkConfirmationTimeouts().catch((err) => { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', err); + }); + }, this.timeoutCheckIntervalMs); + + // 启动清理任务 + this.cleanupInterval = setInterval(() => { + this.cleanup().catch((err) => { + this.logger.error('[OUTBOX] Error cleaning up outbox:', err); + }); + }, this.cleanupIntervalMs); + + this.logger.log('[OUTBOX] Outbox publisher started (B方案 - 消费方确认模式)'); + } + + /** + * 停止轮询 + */ + stop(): void { + if (!this.isRunning) return; + + this.isRunning = false; + + if (this.pollInterval) { + clearInterval(this.pollInterval); + this.pollInterval = null; + } + + if (this.timeoutCheckInterval) { + clearInterval(this.timeoutCheckInterval); + this.timeoutCheckInterval = null; + } + + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + + this.logger.log('[OUTBOX] Outbox publisher stopped'); + } + + /** + * 处理 Outbox 事件 + */ + async processOutbox(): Promise { + if (!this.isConnected) { + return; + } + + try { + // 1. 获取待发布事件 + const pendingEvents = await this.outboxRepository.findPendingEvents(this.batchSize); + + // 2. 获取需要重试的事件 + const retryEvents = await this.outboxRepository.findEventsForRetry(this.batchSize / 2); + + const allEvents = [...pendingEvents, ...retryEvents]; + + if (allEvents.length === 0) { + return; + } + + this.logger.debug(`[OUTBOX] Processing ${allEvents.length} events (${pendingEvents.length} pending, ${retryEvents.length} retry)`); + + // 3. 逐个发布 + for (const event of allEvents) { + await this.publishEvent(event); + } + } catch (error) { + this.logger.error('[OUTBOX] Error in processOutbox:', error); + } + } + + /** + * 发布单个事件 (B方案) + * + * 使用 emit() 发送到 Kafka,成功后标记为 SENT(等待消费方确认) + * 只有收到消费方确认后才标记为 CONFIRMED + */ + private async publishEvent(event: OutboxEvent): Promise { + try { + this.logger.debug(`[OUTBOX] Publishing event ${event.id} to topic ${event.topic}`); + + // 构造 Kafka 消息,包含 outboxId 用于确认 + const payload = { + ...(event.payload as Record), + _outbox: { + id: event.id.toString(), + aggregateId: event.aggregateId, + eventType: event.eventType, + }, + }; + + const message = { + key: event.key, + value: JSON.stringify(payload), + }; + + // 发布到 Kafka + this.kafkaClient.emit(event.topic, message); + + // B方案:标记为 SENT(等待消费方确认) + await this.outboxRepository.markAsSent(event.id); + + this.logger.log( + `[OUTBOX] → Event ${event.id} sent to ${event.topic} (awaiting consumer confirmation)`, + ); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`[OUTBOX] ✗ Failed to publish event ${event.id}: ${errorMessage}`); + + // 标记为失败并安排重试 + await this.outboxRepository.markAsFailed(event.id, errorMessage); + } + } + + /** + * 检查确认超时的事件 (B方案核心) + * + * 将超时未确认的 SENT 事件重置为 PENDING 以便重发 + */ + private async checkConfirmationTimeouts(): Promise { + if (!this.isConnected) { + return; + } + + try { + const timedOutEvents = await this.outboxRepository.findSentEventsTimedOut( + this.confirmationTimeoutMinutes, + this.batchSize, + ); + + if (timedOutEvents.length === 0) { + return; + } + + this.logger.warn( + `[OUTBOX] Found ${timedOutEvents.length} events without confirmation after ${this.confirmationTimeoutMinutes} minutes`, + ); + + for (const event of timedOutEvents) { + await this.outboxRepository.resetSentToPending(event.id); + this.logger.warn( + `[OUTBOX] Event ${event.id} reset to PENDING for retry (retry #${event.retryCount + 1})`, + ); + } + } catch (error) { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', error); + } + } + + /** + * 清理旧事件 + */ + private async cleanup(): Promise { + const retentionDays = this.configService.get('OUTBOX_RETENTION_DAYS', 7); + await this.outboxRepository.cleanupOldEvents(retentionDays); + } + + /** + * 手动触发处理(用于测试或紧急情况) + */ + async triggerProcessing(): Promise { + this.logger.log('[OUTBOX] Manual processing triggered'); + await this.processOutbox(); + } + + /** + * 获取统计信息 + */ + async getStats(): Promise<{ + isRunning: boolean; + isConnected: boolean; + pending: number; + sent: number; + confirmed: number; + failed: number; + }> { + const stats = await this.outboxRepository.getStats(); + return { + isRunning: this.isRunning, + isConnected: this.isConnected, + ...stats, + }; + } +} diff --git a/backend/services/planting-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/planting-service/src/infrastructure/persistence/repositories/outbox.repository.ts new file mode 100644 index 00000000..5d17dee0 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -0,0 +1,364 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { Prisma } from '@prisma/client'; + +export enum OutboxStatus { + PENDING = 'PENDING', // 待发送 + SENT = 'SENT', // 已发送到 Kafka,等待消费方确认 + CONFIRMED = 'CONFIRMED', // 消费方已确认处理成功 + FAILED = 'FAILED', // 发送失败,等待重试 +} + +export interface OutboxEventData { + eventType: string; + topic: string; + key: string; + payload: Record; + aggregateId: string; + aggregateType: string; +} + +export interface OutboxEvent extends OutboxEventData { + id: bigint; + status: OutboxStatus; + retryCount: number; + maxRetries: number; + lastError: string | null; + createdAt: Date; + publishedAt: Date | null; + nextRetryAt: Date | null; +} + +@Injectable() +export class OutboxRepository { + private readonly logger = new Logger(OutboxRepository.name); + + constructor(private readonly prisma: PrismaService) {} + + /** + * 在事务中保存 Outbox 事件 + */ + async saveInTransaction( + tx: Prisma.TransactionClient, + events: OutboxEventData[], + ): Promise { + if (events.length === 0) return; + + this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox (in transaction)`); + + await tx.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }); + + this.logger.debug(`[OUTBOX] Saved ${events.length} events to outbox`); + } + + /** + * 直接保存 Outbox 事件(不在事务中) + */ + async saveEvents(events: OutboxEventData[]): Promise { + if (events.length === 0) return; + + this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox`); + + await this.prisma.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }); + + this.logger.log(`[OUTBOX] ✓ Saved ${events.length} events to outbox`); + } + + /** + * 获取待发布的事件(按创建时间排序) + */ + async findPendingEvents(limit: number = 100): Promise { + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.PENDING, + }, + orderBy: { + createdAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 获取需要重试的事件 + */ + async findEventsForRetry(limit: number = 50): Promise { + const now = new Date(); + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.FAILED, + retryCount: { + lt: this.prisma.outboxEvent.fields.maxRetries, + }, + nextRetryAt: { + lte: now, + }, + }, + orderBy: { + nextRetryAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 标记事件为已发送(等待消费方确认) + * B方案:发送到 Kafka 后先标记为 SENT,等消费方确认后才标记为 CONFIRMED + */ + async markAsSent(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.SENT, + publishedAt: new Date(), + }, + }); + + this.logger.debug(`[OUTBOX] Marked event ${id} as SENT (awaiting consumer confirmation)`); + } + + /** + * 标记事件为已确认(消费方已成功处理) + * B方案:收到消费方确认后调用 + */ + async markAsConfirmed(eventId: string): Promise { + // 通过 aggregateId + eventType 查找事件 + const result = await this.prisma.outboxEvent.updateMany({ + where: { + aggregateId: eventId, + status: OutboxStatus.SENT, + }, + data: { + status: OutboxStatus.CONFIRMED, + }, + }); + + if (result.count > 0) { + this.logger.log(`[OUTBOX] ✓ Event ${eventId} confirmed by consumer`); + return true; + } + + this.logger.warn(`[OUTBOX] Event ${eventId} not found or not in SENT status`); + return false; + } + + /** + * 通过 outbox ID 标记为已确认 + */ + async markAsConfirmedById(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.CONFIRMED, + }, + }); + + this.logger.log(`[OUTBOX] ✓ Event ${id} confirmed by consumer`); + } + + /** + * 获取已发送但未确认且超时的事件(用于重试) + * B方案:超时未收到确认的事件需要重发 + */ + async findSentEventsTimedOut(timeoutMinutes: number = 5, limit: number = 50): Promise { + const cutoffTime = new Date(); + cutoffTime.setMinutes(cutoffTime.getMinutes() - timeoutMinutes); + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.SENT, + publishedAt: { + lt: cutoffTime, + }, + retryCount: { + lt: 5, // 最多重试5次 + }, + }, + orderBy: { + publishedAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 将超时的 SENT 事件重置为 PENDING 以便重发 + */ + async resetSentToPending(id: bigint): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + + if (!event) return; + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.PENDING, + retryCount: event.retryCount + 1, + lastError: 'Consumer confirmation timeout', + }, + }); + + this.logger.warn(`[OUTBOX] Event ${id} reset to PENDING for retry (confirmation timeout)`); + } + + /** + * 标记事件为已发布(旧方法,保留兼容性) + * @deprecated 使用 markAsSent 和 markAsConfirmed 代替 + */ + async markAsPublished(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.CONFIRMED, + publishedAt: new Date(), + }, + }); + + this.logger.debug(`[OUTBOX] Marked event ${id} as published (confirmed)`); + } + + /** + * 标记事件为失败并安排重试 + */ + async markAsFailed(id: bigint, error: string): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + + if (!event) return; + + const newRetryCount = event.retryCount + 1; + const isFinalFailure = newRetryCount >= event.maxRetries; + + // 指数退避:1min, 2min, 4min, 8min, 16min + const delayMinutes = Math.pow(2, newRetryCount - 1); + const nextRetryAt = new Date(); + nextRetryAt.setMinutes(nextRetryAt.getMinutes() + delayMinutes); + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: isFinalFailure ? OutboxStatus.FAILED : OutboxStatus.FAILED, + retryCount: newRetryCount, + lastError: error, + nextRetryAt: isFinalFailure ? null : nextRetryAt, + }, + }); + + if (isFinalFailure) { + this.logger.error(`[OUTBOX] Event ${id} permanently failed after ${newRetryCount} retries: ${error}`); + } else { + this.logger.warn(`[OUTBOX] Event ${id} failed, retry ${newRetryCount}/${event.maxRetries}, next retry at ${nextRetryAt}`); + } + } + + /** + * 批量标记为已确认 + * @deprecated 使用单个 markAsConfirmedById 代替 + */ + async markManyAsPublished(ids: bigint[]): Promise { + if (ids.length === 0) return; + + await this.prisma.outboxEvent.updateMany({ + where: { + id: { in: ids }, + }, + data: { + status: OutboxStatus.CONFIRMED, + publishedAt: new Date(), + }, + }); + + this.logger.debug(`[OUTBOX] Marked ${ids.length} events as confirmed`); + } + + /** + * 清理已确认的旧事件(保留最近7天) + */ + async cleanupOldEvents(retentionDays: number = 7): Promise { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - retentionDays); + + const result = await this.prisma.outboxEvent.deleteMany({ + where: { + status: OutboxStatus.CONFIRMED, + publishedAt: { + lt: cutoffDate, + }, + }, + }); + + if (result.count > 0) { + this.logger.log(`[OUTBOX] Cleaned up ${result.count} old events`); + } + + return result.count; + } + + /** + * 获取 Outbox 统计信息 + */ + async getStats(): Promise<{ + pending: number; + sent: number; + confirmed: number; + failed: number; + }> { + const [pending, sent, confirmed, failed] = await Promise.all([ + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.PENDING } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.SENT } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.CONFIRMED } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.FAILED } }), + ]); + + return { pending, sent, confirmed, failed }; + } + + private mapToOutboxEvent(record: any): OutboxEvent { + return { + id: record.id, + eventType: record.eventType, + topic: record.topic, + key: record.key, + payload: record.payload as Record, + aggregateId: record.aggregateId, + aggregateType: record.aggregateType, + status: record.status as OutboxStatus, + retryCount: record.retryCount, + maxRetries: record.maxRetries, + lastError: record.lastError, + createdAt: record.createdAt, + publishedAt: record.publishedAt, + nextRetryAt: record.nextRetryAt, + }; + } +} diff --git a/backend/services/planting-service/src/infrastructure/persistence/unit-of-work.ts b/backend/services/planting-service/src/infrastructure/persistence/unit-of-work.ts index 197f39b9..4566fbe7 100644 --- a/backend/services/planting-service/src/infrastructure/persistence/unit-of-work.ts +++ b/backend/services/planting-service/src/infrastructure/persistence/unit-of-work.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { Prisma } from '@prisma/client'; import { PrismaService, TransactionClient } from './prisma/prisma.service'; import { PlantingOrder } from '../../domain/aggregates/planting-order.aggregate'; @@ -7,6 +7,7 @@ import { PoolInjectionBatch } from '../../domain/aggregates/pool-injection-batch import { PlantingOrderMapper } from './mappers/planting-order.mapper'; import { PlantingPositionMapper } from './mappers/planting-position.mapper'; import { PoolInjectionBatchMapper } from './mappers/pool-injection-batch.mapper'; +import { OutboxEventData } from './repositories/outbox.repository'; /** * 工作单元 - 用于管理跨多个聚合根的数据库事务 @@ -45,10 +46,70 @@ export class UnitOfWork { /** * 事务性工作单元 - 在事务上下文中提供聚合根的持久化操作 + * + * 支持 Outbox Pattern:在同一个事务中保存业务数据和事件数据 + * 保证事件发布的原子性和可靠性 */ export class TransactionalUnitOfWork { + private readonly logger = new Logger(TransactionalUnitOfWork.name); + private pendingOutboxEvents: OutboxEventData[] = []; + constructor(private readonly tx: TransactionClient) {} + /** + * 添加 Outbox 事件(在事务提交时一起保存) + * + * @param event 事件数据 + */ + addOutboxEvent(event: OutboxEventData): void { + this.logger.debug(`[OUTBOX] Adding event to outbox: ${event.eventType} for ${event.aggregateType}:${event.aggregateId}`); + this.pendingOutboxEvents.push(event); + } + + /** + * 批量添加 Outbox 事件 + * + * @param events 事件数据数组 + */ + addOutboxEvents(events: OutboxEventData[]): void { + this.logger.debug(`[OUTBOX] Adding ${events.length} events to outbox`); + this.pendingOutboxEvents.push(...events); + } + + /** + * 保存所有待发送的 Outbox 事件 + * 应在事务的最后调用 + */ + async commitOutboxEvents(): Promise { + if (this.pendingOutboxEvents.length === 0) { + return; + } + + this.logger.debug(`[OUTBOX] Committing ${this.pendingOutboxEvents.length} outbox events`); + + await this.tx.outboxEvent.createMany({ + data: this.pendingOutboxEvents.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: 'PENDING', + })), + }); + + this.logger.log(`[OUTBOX] ✓ Committed ${this.pendingOutboxEvents.length} outbox events`); + this.pendingOutboxEvents = []; + } + + /** + * 获取待发送的事件数量 + */ + getPendingOutboxEventCount(): number { + return this.pendingOutboxEvents.length; + } + /** * 保存认种订单 */ diff --git a/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts b/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts index 863f4a09..a2bd4ab8 100644 --- a/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts +++ b/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts @@ -1,5 +1,6 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { KafkaService } from '../../infrastructure'; +import { EventAckPublisher } from '../../infrastructure/kafka/event-ack.publisher'; import { TeamStatisticsService } from '../services'; import { UpdateTeamStatisticsCommand } from '../commands'; @@ -11,6 +12,11 @@ interface PlantingCreatedEvent { provinceCode: string; cityCode: string; }; + _outbox?: { + id: string; + aggregateId: string; + eventType: string; + }; } /** @@ -24,6 +30,7 @@ export class PlantingCreatedHandler implements OnModuleInit { constructor( private readonly kafkaService: KafkaService, private readonly teamStatisticsService: TeamStatisticsService, + private readonly eventAckPublisher: EventAckPublisher, ) {} async onModuleInit() { @@ -42,6 +49,10 @@ export class PlantingCreatedHandler implements OnModuleInit { return; } + // B方案:提取 outbox 信息用于发送确认 + const outboxInfo = event._outbox; + const eventId = outboxInfo?.aggregateId || 'unknown'; + try { const command = new UpdateTeamStatisticsCommand( BigInt(event.data.userId), @@ -54,11 +65,22 @@ export class PlantingCreatedHandler implements OnModuleInit { this.logger.log( `Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`, ); + + // B方案:发送处理成功确认 + if (outboxInfo) { + await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType); + } } catch (error) { this.logger.error( `Failed to update team statistics for user ${event.data.userId}:`, error, ); + + // B方案:发送处理失败确认 + if (outboxInfo) { + const errorMessage = error instanceof Error ? error.message : String(error); + await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage); + } } } } diff --git a/backend/services/referral-service/src/infrastructure/kafka/event-ack.publisher.ts b/backend/services/referral-service/src/infrastructure/kafka/event-ack.publisher.ts new file mode 100644 index 00000000..378697e4 --- /dev/null +++ b/backend/services/referral-service/src/infrastructure/kafka/event-ack.publisher.ts @@ -0,0 +1,98 @@ +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认发布器 + * + * B方案核心组件:消费方处理事件后发送确认 + * 发送确认消息到 planting.events.ack topic + */ +@Injectable() +export class EventAckPublisher { + private readonly logger = new Logger(EventAckPublisher.name); + private readonly serviceName = 'referral-service'; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + ) {} + + /** + * 发送处理成功确认 + */ + async sendSuccess(eventId: string, eventType: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: true, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`); + } catch (error) { + this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error); + } + } + + /** + * 发送处理失败确认 + */ + async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: false, + error: errorMessage, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`); + } catch (error) { + this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error); + } + } + + /** + * 从消息中提取 outbox 信息 + */ + extractOutboxInfo(message: Record): { id: string; aggregateId: string; eventType: string } | null { + const outbox = message._outbox as { id: string; aggregateId: string; eventType: string } | undefined; + if (!outbox) { + this.logger.warn('[ACK] Message does not contain _outbox metadata'); + return null; + } + return outbox; + } +} diff --git a/backend/services/reward-service/prisma/migrations/20241204000000_init/migration.sql b/backend/services/reward-service/prisma/migrations/20241204000000_init/migration.sql new file mode 100644 index 00000000..6f34e7b3 --- /dev/null +++ b/backend/services/reward-service/prisma/migrations/20241204000000_init/migration.sql @@ -0,0 +1,117 @@ +-- CreateTable: reward_ledger_entries (聚合根1 - 行为表, append-only) +CREATE TABLE "reward_ledger_entries" ( + "entry_id" BIGSERIAL NOT NULL, + "user_id" BIGINT NOT NULL, + "source_order_id" BIGINT NOT NULL, + "source_user_id" BIGINT NOT NULL, + "right_type" VARCHAR(50) NOT NULL, + "usdt_amount" DECIMAL(20,8) NOT NULL, + "hashpower_amount" DECIMAL(20,8) NOT NULL DEFAULT 0, + "reward_status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "created_at" TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "expire_at" TIMESTAMP(3), + "claimed_at" TIMESTAMP(3), + "settled_at" TIMESTAMP(3), + "expired_at" TIMESTAMP(3), + "memo" VARCHAR(500), + + CONSTRAINT "reward_ledger_entries_pkey" PRIMARY KEY ("entry_id") +); + +-- CreateTable: reward_summaries (聚合根2 - 状态表) +CREATE TABLE "reward_summaries" ( + "summary_id" BIGSERIAL NOT NULL, + "user_id" BIGINT NOT NULL, + "pending_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "pending_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "pending_expire_at" TIMESTAMP(3), + "settleable_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settleable_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settled_total_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settled_total_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "expired_total_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "expired_total_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "last_update_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "reward_summaries_pkey" PRIMARY KEY ("summary_id") +); + +-- CreateTable: right_definitions (配置表) +CREATE TABLE "right_definitions" ( + "definition_id" BIGSERIAL NOT NULL, + "right_type" VARCHAR(50) NOT NULL, + "usdt_per_tree" DECIMAL(20,8) NOT NULL, + "hashpower_percent" DECIMAL(5,2) NOT NULL DEFAULT 0, + "payable_to" VARCHAR(50) NOT NULL, + "rule_description" TEXT, + "is_enabled" BOOLEAN NOT NULL DEFAULT true, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "right_definitions_pkey" PRIMARY KEY ("definition_id") +); + +-- CreateTable: settlement_records (行为表) +CREATE TABLE "settlement_records" ( + "settlement_id" BIGSERIAL NOT NULL, + "user_id" BIGINT NOT NULL, + "usdt_amount" DECIMAL(20,8) NOT NULL, + "hashpower_amount" DECIMAL(20,8) NOT NULL, + "settle_currency" VARCHAR(10) NOT NULL, + "received_amount" DECIMAL(20,8) NOT NULL, + "swap_tx_hash" VARCHAR(100), + "swap_rate" DECIMAL(20,8), + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "completed_at" TIMESTAMP(3), + "reward_entry_ids" BIGINT[], + + CONSTRAINT "settlement_records_pkey" PRIMARY KEY ("settlement_id") +); + +-- CreateTable: reward_events (行为表, append-only) +CREATE TABLE "reward_events" ( + "event_id" BIGSERIAL NOT NULL, + "event_type" VARCHAR(50) NOT NULL, + "aggregate_id" VARCHAR(100) NOT NULL, + "aggregate_type" VARCHAR(50) NOT NULL, + "event_data" JSONB NOT NULL, + "user_id" BIGINT, + "occurred_at" TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "version" INTEGER NOT NULL DEFAULT 1, + + CONSTRAINT "reward_events_pkey" PRIMARY KEY ("event_id") +); + +-- CreateIndex: reward_ledger_entries indexes +CREATE INDEX "idx_user_status" ON "reward_ledger_entries"("user_id", "reward_status"); +CREATE INDEX "idx_user_created" ON "reward_ledger_entries"("user_id", "created_at" DESC); +CREATE INDEX "idx_source_order" ON "reward_ledger_entries"("source_order_id"); +CREATE INDEX "idx_source_user" ON "reward_ledger_entries"("source_user_id"); +CREATE INDEX "idx_right_type" ON "reward_ledger_entries"("right_type"); +CREATE INDEX "idx_status" ON "reward_ledger_entries"("reward_status"); +CREATE INDEX "idx_expire" ON "reward_ledger_entries"("expire_at"); +CREATE INDEX "idx_created" ON "reward_ledger_entries"("created_at"); + +-- CreateIndex: reward_summaries indexes +CREATE UNIQUE INDEX "reward_summaries_user_id_key" ON "reward_summaries"("user_id"); +CREATE INDEX "idx_summary_user" ON "reward_summaries"("user_id"); +CREATE INDEX "idx_settleable_desc" ON "reward_summaries"("settleable_usdt" DESC); +CREATE INDEX "idx_pending_expire" ON "reward_summaries"("pending_expire_at"); + +-- CreateIndex: right_definitions indexes +CREATE UNIQUE INDEX "right_definitions_right_type_key" ON "right_definitions"("right_type"); +CREATE INDEX "idx_def_right_type" ON "right_definitions"("right_type"); +CREATE INDEX "idx_def_enabled" ON "right_definitions"("is_enabled"); + +-- CreateIndex: settlement_records indexes +CREATE INDEX "idx_settlement_user" ON "settlement_records"("user_id"); +CREATE INDEX "idx_settlement_status" ON "settlement_records"("status"); +CREATE INDEX "idx_settlement_created" ON "settlement_records"("created_at"); + +-- CreateIndex: reward_events indexes +CREATE INDEX "idx_reward_event_aggregate" ON "reward_events"("aggregate_type", "aggregate_id"); +CREATE INDEX "idx_reward_event_type" ON "reward_events"("event_type"); +CREATE INDEX "idx_reward_event_user" ON "reward_events"("user_id"); +CREATE INDEX "idx_reward_event_occurred" ON "reward_events"("occurred_at"); diff --git a/backend/services/reward-service/src/infrastructure/kafka/event-ack.publisher.ts b/backend/services/reward-service/src/infrastructure/kafka/event-ack.publisher.ts new file mode 100644 index 00000000..1b6e3cfa --- /dev/null +++ b/backend/services/reward-service/src/infrastructure/kafka/event-ack.publisher.ts @@ -0,0 +1,98 @@ +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认发布器 + * + * B方案核心组件:消费方处理事件后发送确认 + * 发送确认消息到 planting.events.ack topic + */ +@Injectable() +export class EventAckPublisher { + private readonly logger = new Logger(EventAckPublisher.name); + private readonly serviceName = 'reward-service'; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + ) {} + + /** + * 发送处理成功确认 + */ + async sendSuccess(eventId: string, eventType: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: true, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`); + } catch (error) { + this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error); + } + } + + /** + * 发送处理失败确认 + */ + async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: false, + error: errorMessage, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`); + } catch (error) { + this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error); + } + } + + /** + * 从消息中提取 outbox 信息 + */ + extractOutboxInfo(message: Record): { id: string; aggregateId: string; eventType: string } | null { + const outbox = message._outbox as { id: string; aggregateId: string; eventType: string } | undefined; + if (!outbox) { + this.logger.warn('[ACK] Message does not contain _outbox metadata'); + return null; + } + return outbox; + } +} diff --git a/backend/services/reward-service/src/infrastructure/kafka/event-consumer.controller.ts b/backend/services/reward-service/src/infrastructure/kafka/event-consumer.controller.ts index a8648f2d..7ff03f3f 100644 --- a/backend/services/reward-service/src/infrastructure/kafka/event-consumer.controller.ts +++ b/backend/services/reward-service/src/infrastructure/kafka/event-consumer.controller.ts @@ -1,14 +1,31 @@ import { Controller, Logger } from '@nestjs/common'; import { MessagePattern, Payload } from '@nestjs/microservices'; import { RewardApplicationService } from '../../application/services/reward-application.service'; +import { EventAckPublisher } from './event-ack.publisher'; interface PlantingOrderPaidEvent { - orderId: string; - userId: string; - treeCount: number; - provinceCode: string; - cityCode: string; - paidAt: string; + eventName?: string; + data?: { + orderId: string; + userId: string; + treeCount: number; + provinceCode: string; + cityCode: string; + paidAt: string; + }; + // 兼容旧格式 + orderId?: string; + userId?: string; + treeCount?: number; + provinceCode?: string; + cityCode?: string; + paidAt?: string; + // B方案:outbox 元数据 + _outbox?: { + id: string; + aggregateId: string; + eventType: string; + }; } @Controller() @@ -17,6 +34,7 @@ export class EventConsumerController { constructor( private readonly rewardService: RewardApplicationService, + private readonly eventAckPublisher: EventAckPublisher, ) {} /** @@ -26,22 +44,48 @@ export class EventConsumerController { async handlePlantingOrderPaid(@Payload() message: PlantingOrderPaidEvent) { this.logger.log(`Received planting.order.paid event: ${JSON.stringify(message)}`); + // 解析消息数据(支持新旧格式) + const eventData = message.data || { + orderId: message.orderId!, + userId: message.userId!, + treeCount: message.treeCount!, + provinceCode: message.provinceCode!, + cityCode: message.cityCode!, + paidAt: message.paidAt!, + }; + + // B方案:提取 outbox 信息用于发送确认 + const outboxInfo = message._outbox; + const eventId = outboxInfo?.aggregateId || eventData.orderId; + try { // 1. 计算并分配奖励 await this.rewardService.distributeRewards({ - sourceOrderId: BigInt(message.orderId), - sourceUserId: BigInt(message.userId), - treeCount: message.treeCount, - provinceCode: message.provinceCode, - cityCode: message.cityCode, + sourceOrderId: BigInt(eventData.orderId), + sourceUserId: BigInt(eventData.userId), + treeCount: eventData.treeCount, + provinceCode: eventData.provinceCode, + cityCode: eventData.cityCode, }); // 2. 检查该用户是否有待领取奖励需要转为可结算 - await this.rewardService.claimPendingRewardsForUser(BigInt(message.userId)); + await this.rewardService.claimPendingRewardsForUser(BigInt(eventData.userId)); - this.logger.log(`Successfully processed planting.order.paid for order ${message.orderId}`); + this.logger.log(`Successfully processed planting.order.paid for order ${eventData.orderId}`); + + // B方案:发送处理成功确认 + if (outboxInfo) { + await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType); + } } catch (error) { this.logger.error(`Error processing planting.order.paid:`, error); + + // B方案:发送处理失败确认 + if (outboxInfo) { + const errorMessage = error instanceof Error ? error.message : String(error); + await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage); + } + throw error; } }