diff --git a/backend/services/authorization-service/src/infrastructure/kafka/event-ack.publisher.ts b/backend/services/authorization-service/src/infrastructure/kafka/event-ack.publisher.ts new file mode 100644 index 00000000..2e24b8e5 --- /dev/null +++ b/backend/services/authorization-service/src/infrastructure/kafka/event-ack.publisher.ts @@ -0,0 +1,98 @@ +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认发布器 + * + * B方案核心组件:消费方处理事件后发送确认 + * 发送确认消息到 planting.events.ack topic + */ +@Injectable() +export class EventAckPublisher { + private readonly logger = new Logger(EventAckPublisher.name); + private readonly serviceName = 'authorization-service'; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + ) {} + + /** + * 发送处理成功确认 + */ + async sendSuccess(eventId: string, eventType: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: true, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`); + } catch (error) { + this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error); + } + } + + /** + * 发送处理失败确认 + */ + async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: false, + error: errorMessage, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`); + } catch (error) { + this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error); + } + } + + /** + * 从消息中提取 outbox 信息 + */ + extractOutboxInfo(message: Record): { id: string; aggregateId: string; eventType: string } | null { + const outbox = message._outbox as { id: string; aggregateId: string; eventType: string } | undefined; + if (!outbox) { + this.logger.warn('[ACK] Message does not contain _outbox metadata'); + return null; + } + return outbox; + } +} diff --git a/backend/services/authorization-service/src/infrastructure/kafka/event-consumer.controller.ts b/backend/services/authorization-service/src/infrastructure/kafka/event-consumer.controller.ts index 83c1cf6a..d3c3308e 100644 --- a/backend/services/authorization-service/src/infrastructure/kafka/event-consumer.controller.ts +++ b/backend/services/authorization-service/src/infrastructure/kafka/event-consumer.controller.ts @@ -1,28 +1,100 @@ -import { Controller, Logger } from '@nestjs/common' +import { Controller, Logger, Inject } from '@nestjs/common' import { EventPattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices' +import { + IAuthorizationRoleRepository, + AUTHORIZATION_ROLE_REPOSITORY, + IMonthlyAssessmentRepository, + MONTHLY_ASSESSMENT_REPOSITORY, +} from '../../domain/repositories' +import { + TEAM_STATISTICS_REPOSITORY, +} from '../../application/services/authorization-application.service' +import type { ITeamStatisticsRepository, TeamStatistics } from '../../domain/services' +import { UserId, Month } from '../../domain/value-objects' +import { RoleType, AuthorizationStatus } from '../../domain/enums' +import { LadderTargetRule } from '../../domain/entities' +import { MonthlyAssessment, AuthorizationRole } from '../../domain/aggregates' +import { EventPublisherService } from './event-publisher.service' +import { EventAckPublisher } from './event-ack.publisher' + +interface TreePlantedPayload { + userId: string + treeCount: number + provinceCode?: string + cityCode?: string +} + +interface PlantingEventMessage { + eventName?: string + eventType?: string + aggregateId?: string + occurredAt?: string + data?: TreePlantedPayload + payload?: TreePlantedPayload + // B方案:outbox 元数据 + _outbox?: { + id: string + aggregateId: string + eventType: string + } +} @Controller() export class EventConsumerController { private readonly logger = new Logger(EventConsumerController.name) + constructor( + @Inject(AUTHORIZATION_ROLE_REPOSITORY) + private readonly authorizationRepository: IAuthorizationRoleRepository, + @Inject(MONTHLY_ASSESSMENT_REPOSITORY) + private readonly assessmentRepository: IMonthlyAssessmentRepository, + @Inject(TEAM_STATISTICS_REPOSITORY) + private readonly statsRepository: ITeamStatisticsRepository, + private readonly eventPublisher: EventPublisherService, + private readonly eventAckPublisher: EventAckPublisher, + ) {} + // 监听认种事件 - 用于更新考核进度 @EventPattern('planting-events') async handlePlantingEvent( - @Payload() message: any, + @Payload() message: PlantingEventMessage, @Ctx() context: KafkaContext, ) { - try { - this.logger.log(`Received planting event: ${message.eventType}`) + const eventType = message.eventType || message.eventName || 'unknown' + const outboxInfo = message._outbox + const eventId = outboxInfo?.aggregateId || message.aggregateId || 'unknown' - switch (message.eventType) { + try { + this.logger.log(`[KAFKA] Received planting event: ${eventType}`) + this.logger.debug(`[KAFKA] Event payload: ${JSON.stringify(message)}`) + + // 获取 payload(支持多种格式) + const payload = message.payload || message.data + + switch (eventType) { case 'planting.tree.planted': - await this.handleTreePlanted(message.payload) + case 'PlantingOrderPaid': + case 'MiningEnabled': + if (payload) { + await this.handleTreePlanted(payload) + } break default: - this.logger.debug(`Unhandled planting event type: ${message.eventType}`) + this.logger.debug(`[KAFKA] Unhandled planting event type: ${eventType}`) + } + + // B方案:发送处理成功确认 + if (outboxInfo) { + await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType) } } catch (error) { - this.logger.error('Failed to handle planting event:', error) + this.logger.error('[KAFKA] Failed to handle planting event:', error) + + // B方案:发送处理失败确认 + if (outboxInfo) { + const errorMessage = error instanceof Error ? error.message : String(error) + await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage) + } } } @@ -33,24 +105,180 @@ export class EventConsumerController { @Ctx() context: KafkaContext, ) { try { - this.logger.log(`Received referral event: ${message.eventType}`) + this.logger.log(`[KAFKA] Received referral event: ${message.eventType}`) switch (message.eventType) { case 'referral.relationship.created': // 处理推荐关系创建事件 break default: - this.logger.debug(`Unhandled referral event type: ${message.eventType}`) + this.logger.debug(`[KAFKA] Unhandled referral event type: ${message.eventType}`) } } catch (error) { - this.logger.error('Failed to handle referral event:', error) + this.logger.error('[KAFKA] Failed to handle referral event:', error) } } - private async handleTreePlanted(payload: any) { - // TODO: 更新用户团队认种统计 - // TODO: 检查是否达成初始考核目标 - // TODO: 更新月度考核进度 - this.logger.log(`Tree planted by user: ${payload.userId}, count: ${payload.treeCount}`) + /** + * 处理认种事件 + * 1. 检查用户是否有待激活的授权(初始考核) + * 2. 更新正在进行的月度考核进度 + */ + private async handleTreePlanted(payload: TreePlantedPayload): Promise { + const { userId, treeCount } = payload + this.logger.log(`[PLANTING] Processing tree planted event: userId=${userId}, count=${treeCount}`) + + try { + // 1. 获取用户团队统计 + const teamStats = await this.statsRepository.findByUserId(userId) + if (!teamStats) { + this.logger.warn(`[PLANTING] No team stats found for user ${userId}, skipping`) + return + } + + const totalTreeCount = teamStats.totalTeamPlantingCount + this.logger.debug(`[PLANTING] User ${userId} total team planting count: ${totalTreeCount}`) + + // 2. 获取用户所有授权 + const authorizations = await this.authorizationRepository.findByUserId( + UserId.create(userId), + ) + + if (authorizations.length === 0) { + this.logger.debug(`[PLANTING] User ${userId} has no authorizations`) + return + } + + // 3. 处理每个授权 + for (const auth of authorizations) { + this.logger.debug(`[PLANTING] Processing authorization: ${auth.authorizationId.value}, role=${auth.roleType}, benefitActive=${auth.benefitActive}`) + + // 3.1 检查初始考核(权益未激活的情况) + if (!auth.benefitActive && auth.status === AuthorizationStatus.PENDING) { + const initialTarget = auth.getInitialTarget() + this.logger.debug(`[PLANTING] Checking initial target: ${totalTreeCount}/${initialTarget}`) + + if (totalTreeCount >= initialTarget) { + this.logger.log(`[PLANTING] User ${userId} reached initial target for ${auth.roleType}, activating benefit`) + auth.activateBenefit() + await this.authorizationRepository.save(auth) + await this.eventPublisher.publishAll(auth.domainEvents) + auth.clearDomainEvents() + + // 创建首月考核(针对授权省市公司) + if (auth.roleType === RoleType.AUTH_PROVINCE_COMPANY || + auth.roleType === RoleType.AUTH_CITY_COMPANY) { + await this.createInitialAssessment(auth, teamStats) + } + } + } + + // 3.2 更新月度考核进度(权益已激活的授权省市公司) + if (auth.benefitActive && + (auth.roleType === RoleType.AUTH_PROVINCE_COMPANY || + auth.roleType === RoleType.AUTH_CITY_COMPANY)) { + await this.updateMonthlyAssessment(auth, teamStats) + } + } + + this.logger.log(`[PLANTING] Completed processing tree planted event for user ${userId}`) + } catch (error) { + this.logger.error(`[PLANTING] Error processing tree planted for user ${userId}:`, error) + throw error + } + } + + /** + * 创建首月考核 + */ + private async createInitialAssessment( + authorization: AuthorizationRole, + teamStats: TeamStatistics, + ): Promise { + const currentMonth = Month.current() + const target = LadderTargetRule.getTarget(authorization.roleType, 1) + + this.logger.log(`[ASSESSMENT] Creating initial assessment for ${authorization.authorizationId.value}, month=${currentMonth.value}`) + + // 检查是否已存在 + const existing = await this.assessmentRepository.findByAuthorizationAndMonth( + authorization.authorizationId, + currentMonth, + ) + + if (existing) { + this.logger.debug(`[ASSESSMENT] Assessment already exists, updating...`) + await this.doAssessmentUpdate(existing, authorization, teamStats) + return + } + + const assessment = MonthlyAssessment.create({ + authorizationId: authorization.authorizationId, + userId: authorization.userId, + roleType: authorization.roleType, + regionCode: authorization.regionCode, + assessmentMonth: currentMonth, + monthIndex: 1, + monthlyTarget: target.monthlyTarget, + cumulativeTarget: target.cumulativeTarget, + }) + + await this.doAssessmentUpdate(assessment, authorization, teamStats) + this.logger.log(`[ASSESSMENT] Created initial assessment for ${authorization.authorizationId.value}`) + } + + /** + * 更新月度考核进度 + */ + private async updateMonthlyAssessment( + authorization: AuthorizationRole, + teamStats: TeamStatistics, + ): Promise { + const currentMonth = Month.current() + + // 查找当月考核记录 + const assessment = await this.assessmentRepository.findByAuthorizationAndMonth( + authorization.authorizationId, + currentMonth, + ) + + if (!assessment) { + this.logger.debug(`[ASSESSMENT] No assessment found for ${authorization.authorizationId.value} in ${currentMonth.value}`) + return + } + + await this.doAssessmentUpdate(assessment, authorization, teamStats) + this.logger.log(`[ASSESSMENT] Updated assessment for ${authorization.authorizationId.value}`) + } + + /** + * 执行考核评估 + */ + private async doAssessmentUpdate( + assessment: MonthlyAssessment, + authorization: AuthorizationRole, + teamStats: TeamStatistics, + ): Promise { + // 计算本地团队数量 + let localTeamCount = 0 + if (authorization.roleType === RoleType.AUTH_PROVINCE_COMPANY) { + localTeamCount = teamStats.getProvinceTeamCount(authorization.regionCode.value) + } else if (authorization.roleType === RoleType.AUTH_CITY_COMPANY) { + localTeamCount = teamStats.getCityTeamCount(authorization.regionCode.value) + } + + this.logger.debug(`[ASSESSMENT] Assessing: cumulative=${teamStats.totalTeamPlantingCount}, local=${localTeamCount}, total=${teamStats.totalTeamPlantingCount}`) + + assessment.assess({ + cumulativeCompleted: teamStats.totalTeamPlantingCount, + localTeamCount, + totalTeamCount: teamStats.totalTeamPlantingCount, + requireLocalPercentage: authorization.requireLocalPercentage, + exemptFromPercentageCheck: authorization.exemptFromPercentageCheck, + }) + + await this.assessmentRepository.save(assessment) + await this.eventPublisher.publishAll(assessment.domainEvents) + assessment.clearDomainEvents() } } diff --git a/backend/services/planting-service/.env.development b/backend/services/planting-service/.env.development index 65a7497c..7da73714 100644 --- a/backend/services/planting-service/.env.development +++ b/backend/services/planting-service/.env.development @@ -12,3 +12,7 @@ JWT_SECRET="planting-service-dev-jwt-secret" WALLET_SERVICE_URL=http://localhost:3002 IDENTITY_SERVICE_URL=http://localhost:3001 REFERRAL_SERVICE_URL=http://localhost:3004 + +# Kafka Configuration +KAFKA_BROKERS=localhost:9092 +KAFKA_CLIENT_ID=planting-service diff --git a/backend/services/planting-service/.env.example b/backend/services/planting-service/.env.example index 995065dc..1f08d539 100644 --- a/backend/services/planting-service/.env.example +++ b/backend/services/planting-service/.env.example @@ -12,3 +12,7 @@ JWT_SECRET="your-super-secret-jwt-key-change-in-production" WALLET_SERVICE_URL=http://localhost:3002 IDENTITY_SERVICE_URL=http://localhost:3001 REFERRAL_SERVICE_URL=http://localhost:3004 + +# Kafka Configuration +KAFKA_BROKERS=localhost:9092 +KAFKA_CLIENT_ID=planting-service diff --git a/backend/services/planting-service/package-lock.json b/backend/services/planting-service/package-lock.json index ed5d9883..2d114004 100644 --- a/backend/services/planting-service/package-lock.json +++ b/backend/services/planting-service/package-lock.json @@ -14,6 +14,7 @@ "@nestjs/config": "^3.1.1", "@nestjs/core": "^10.0.0", "@nestjs/jwt": "^10.2.0", + "@nestjs/microservices": "^10.0.0", "@nestjs/passport": "^10.0.3", "@nestjs/platform-express": "^10.0.0", "@nestjs/swagger": "^7.1.17", @@ -21,6 +22,7 @@ "class-transformer": "^0.5.1", "class-validator": "^0.14.0", "jsonwebtoken": "^9.0.0", + "kafkajs": "^2.2.4", "passport": "^0.7.0", "passport-jwt": "^4.0.1", "reflect-metadata": "^0.1.13", @@ -246,6 +248,7 @@ "integrity": "sha512-e7jT4DxYvIDLk1ZHmU/m/mB19rex9sv0c2ftBtjSBv+kVM/902eh0fINUzD7UwLLNR+jU585GxUJ8/EBfAM5fw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@babel/code-frame": "^7.27.1", "@babel/generator": "^7.28.5", @@ -1865,6 +1868,65 @@ } } }, + "node_modules/@nestjs/microservices": { + "version": "10.4.20", + "resolved": "https://registry.npmjs.org/@nestjs/microservices/-/microservices-10.4.20.tgz", + "integrity": "sha512-zu/o84Z0uTUClNnGIGfIjcrO3z6T60h/pZPSJK50o4mehbEvJ76fijj6R/WTW0VP+1N16qOv/NsiYLKJA5Cc3w==", + "license": "MIT", + "peer": true, + "dependencies": { + "iterare": "1.2.1", + "tslib": "2.8.1" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/nest" + }, + "peerDependencies": { + "@grpc/grpc-js": "*", + "@nestjs/common": "^10.0.0", + "@nestjs/core": "^10.0.0", + "@nestjs/websockets": "^10.0.0", + "amqp-connection-manager": "*", + "amqplib": "*", + "cache-manager": "*", + "ioredis": "*", + "kafkajs": "*", + "mqtt": "*", + "nats": "*", + "reflect-metadata": "^0.1.12 || ^0.2.0", + "rxjs": "^7.1.0" + }, + "peerDependenciesMeta": { + "@grpc/grpc-js": { + "optional": true + }, + "@nestjs/websockets": { + "optional": true + }, + "amqp-connection-manager": { + "optional": true + }, + "amqplib": { + "optional": true + }, + "cache-manager": { + "optional": true + }, + "ioredis": { + "optional": true + }, + "kafkajs": { + "optional": true + }, + "mqtt": { + "optional": true + }, + "nats": { + "optional": true + } + } + }, "node_modules/@nestjs/passport": { "version": "10.0.3", "resolved": "https://registry.npmjs.org/@nestjs/passport/-/passport-10.0.3.tgz", @@ -6964,6 +7026,16 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "license": "MIT", + "peer": true, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.4", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz", diff --git a/backend/services/planting-service/package.json b/backend/services/planting-service/package.json index 92d2a23c..7470066e 100644 --- a/backend/services/planting-service/package.json +++ b/backend/services/planting-service/package.json @@ -27,12 +27,14 @@ "@nestjs/common": "^10.0.0", "@nestjs/config": "^3.1.1", "@nestjs/core": "^10.0.0", + "@nestjs/microservices": "^10.0.0", "@nestjs/platform-express": "^10.0.0", "@nestjs/swagger": "^7.1.17", "@nestjs/axios": "^3.0.1", "@nestjs/jwt": "^10.2.0", "@nestjs/passport": "^10.0.3", "@prisma/client": "^5.7.0", + "kafkajs": "^2.2.4", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", "jsonwebtoken": "^9.0.0", diff --git a/backend/services/planting-service/prisma/migrations/20241210000000_add_outbox/migration.sql b/backend/services/planting-service/prisma/migrations/20241210000000_add_outbox/migration.sql new file mode 100644 index 00000000..7f05b777 --- /dev/null +++ b/backend/services/planting-service/prisma/migrations/20241210000000_add_outbox/migration.sql @@ -0,0 +1,31 @@ +-- CreateTable: outbox_events (Outbox Pattern) +CREATE TABLE "outbox_events" ( + "outbox_id" BIGSERIAL NOT NULL, + "event_type" VARCHAR(100) NOT NULL, + "topic" VARCHAR(100) NOT NULL, + "key" VARCHAR(200) NOT NULL, + "payload" JSONB NOT NULL, + "aggregate_id" VARCHAR(100) NOT NULL, + "aggregate_type" VARCHAR(50) NOT NULL, + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "retry_count" INTEGER NOT NULL DEFAULT 0, + "max_retries" INTEGER NOT NULL DEFAULT 5, + "last_error" TEXT, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "published_at" TIMESTAMP(3), + "next_retry_at" TIMESTAMP(3), + + CONSTRAINT "outbox_events_pkey" PRIMARY KEY ("outbox_id") +); + +-- CreateIndex +CREATE INDEX "outbox_events_status_created_at_idx" ON "outbox_events"("status", "created_at"); + +-- CreateIndex +CREATE INDEX "outbox_events_status_next_retry_at_idx" ON "outbox_events"("status", "next_retry_at"); + +-- CreateIndex +CREATE INDEX "outbox_events_aggregate_type_aggregate_id_idx" ON "outbox_events"("aggregate_type", "aggregate_id"); + +-- CreateIndex +CREATE INDEX "outbox_events_topic_idx" ON "outbox_events"("topic"); diff --git a/backend/services/planting-service/prisma/schema.prisma b/backend/services/planting-service/prisma/schema.prisma index ec1c54e3..a0fc32f0 100644 --- a/backend/services/planting-service/prisma/schema.prisma +++ b/backend/services/planting-service/prisma/schema.prisma @@ -195,3 +195,41 @@ model PlantingEvent { @@index([occurredAt]) @@map("planting_events") } + +// ============================================ +// Outbox 事件发件箱表 (Outbox Pattern) +// 保证事件发布的可靠性: +// 1. 业务数据和 Outbox 记录在同一个事务中写入 +// 2. 后台任务轮询 Outbox 表并发布到 Kafka +// 3. 发布成功后标记为已处理 +// ============================================ +model OutboxEvent { + id BigInt @id @default(autoincrement()) @map("outbox_id") + + // 事件信息 + eventType String @map("event_type") @db.VarChar(100) + topic String @map("topic") @db.VarChar(100) + key String @map("key") @db.VarChar(200) + payload Json @map("payload") + + // 聚合根信息 (用于幂等性检查) + aggregateId String @map("aggregate_id") @db.VarChar(100) + aggregateType String @map("aggregate_type") @db.VarChar(50) + + // 发布状态 + status String @default("PENDING") @map("status") @db.VarChar(20) // PENDING, PUBLISHED, FAILED + retryCount Int @default(0) @map("retry_count") + maxRetries Int @default(5) @map("max_retries") + lastError String? @map("last_error") @db.Text + + // 时间戳 + createdAt DateTime @default(now()) @map("created_at") + publishedAt DateTime? @map("published_at") + nextRetryAt DateTime? @map("next_retry_at") + + @@index([status, createdAt]) + @@index([status, nextRetryAt]) + @@index([aggregateType, aggregateId]) + @@index([topic]) + @@map("outbox_events") +} diff --git a/backend/services/planting-service/src/application/services/planting-application.service.ts b/backend/services/planting-service/src/application/services/planting-application.service.ts index fd38da96..b5759be7 100644 --- a/backend/services/planting-service/src/application/services/planting-application.service.ts +++ b/backend/services/planting-service/src/application/services/planting-application.service.ts @@ -16,6 +16,7 @@ import { FundAllocationDomainService } from '../../domain/services/fund-allocati import { WalletServiceClient } from '../../infrastructure/external/wallet-service.client'; import { ReferralServiceClient } from '../../infrastructure/external/referral-service.client'; import { UnitOfWork, UNIT_OF_WORK } from '../../infrastructure/persistence/unit-of-work'; +import { OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository'; import { PRICE_PER_TREE } from '../../domain/value-objects/fund-allocation-target-type.enum'; // 个人最大认种数量限制 @@ -228,8 +229,8 @@ export class PlantingApplicationService { order.markAsPaid(); order.allocateFunds(allocations); - // 7. 使用事务保存本地数据库的所有变更 - // 这确保了订单状态、用户持仓、批次数据的原子性 + // 7. 使用事务保存本地数据库的所有变更 + Outbox事件 + // 这确保了订单状态、用户持仓、批次数据、以及事件发布的原子性 await this.unitOfWork.executeInTransaction(async (uow) => { // 保存订单状态 await uow.saveOrder(order); @@ -256,6 +257,16 @@ export class PlantingApplicationService { scheduledTime.setHours(scheduledTime.getHours() + 1); order.schedulePoolInjection(batch.id!, scheduledTime); await uow.saveOrder(order); + + // 8. 添加 Outbox 事件(在同一事务中保存) + // 使用 Outbox Pattern 保证事件发布的可靠性 + const outboxEvents = this.buildOutboxEvents(order, selection); + uow.addOutboxEvents(outboxEvents); + + // 提交 Outbox 事件(在事务中保存到数据库) + await uow.commitOutboxEvents(); + + this.logger.log(`[OUTBOX] Added ${outboxEvents.length} events to outbox for order ${order.orderNo}`); }); this.logger.log(`Local database transaction committed for order ${order.orderNo}`); @@ -268,6 +279,9 @@ export class PlantingApplicationService { this.logger.log(`Order paid successfully: ${order.orderNo}`); + // 清除已保存到 Outbox 的领域事件 + order.clearDomainEvents(); + return { orderNo: order.orderNo, status: order.status, @@ -400,4 +414,80 @@ export class PlantingApplicationService { } } + /** + * 构建 Outbox 事件数据 + * + * 为以下服务创建事件: + * - reward-service: 分配奖励 (planting.order.paid) + * - referral-service: 更新团队统计 (planting.planting.created) + * - authorization-service: 更新KPI考核 (planting-events) + */ + private buildOutboxEvents( + order: PlantingOrder, + selection: { provinceCode: string; cityCode: string }, + ): OutboxEventData[] { + const events: OutboxEventData[] = []; + + this.logger.debug(`[OUTBOX] Building outbox events for order ${order.orderNo}`); + + // 1. planting.order.paid 事件 (reward-service 消费) + events.push({ + eventType: 'planting.order.paid', + topic: 'planting.order.paid', + key: order.userId.toString(), + payload: { + eventName: 'planting.order.paid', + data: { + orderId: order.orderNo, + userId: order.userId.toString(), + treeCount: order.treeCount.value, + provinceCode: selection.provinceCode, + cityCode: selection.cityCode, + paidAt: order.paidAt!.toISOString(), + }, + }, + aggregateId: order.orderNo, + aggregateType: 'PlantingOrder', + }); + + // 2. planting.planting.created 事件 (referral-service 消费) + events.push({ + eventType: 'planting.planting.created', + topic: 'planting.planting.created', + key: order.userId.toString(), + payload: { + eventName: 'planting.created', + data: { + userId: order.userId.toString(), + treeCount: order.treeCount.value, + provinceCode: selection.provinceCode, + cityCode: selection.cityCode, + }, + }, + aggregateId: order.orderNo, + aggregateType: 'PlantingOrder', + }); + + // 3. planting-events 事件 (authorization-service 消费) + // 发布所有领域事件 + for (const domainEvent of order.domainEvents) { + events.push({ + eventType: domainEvent.type, + topic: 'planting-events', + key: order.userId.toString(), + payload: { + eventName: domainEvent.type, + aggregateId: domainEvent.aggregateId, + occurredAt: domainEvent.occurredAt, + data: domainEvent.data, + }, + aggregateId: order.orderNo, + aggregateType: 'PlantingOrder', + }); + } + + this.logger.debug(`[OUTBOX] Built ${events.length} outbox events for order ${order.orderNo}`); + + return events; + } } diff --git a/backend/services/planting-service/src/application/services/pool-injection.service.ts b/backend/services/planting-service/src/application/services/pool-injection.service.ts index eb8c8044..1c5e614a 100644 --- a/backend/services/planting-service/src/application/services/pool-injection.service.ts +++ b/backend/services/planting-service/src/application/services/pool-injection.service.ts @@ -12,7 +12,9 @@ import { POOL_INJECTION_BATCH_REPOSITORY, } from '../../domain/repositories/pool-injection-batch.repository.interface'; import { WalletServiceClient } from '../../infrastructure/external/wallet-service.client'; +import { OutboxRepository, OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository'; import { BatchStatus } from '../../domain/value-objects/batch-status.enum'; +import { PlantingOrder } from '../../domain/aggregates/planting-order.aggregate'; @Injectable() export class PoolInjectionService { @@ -26,6 +28,7 @@ export class PoolInjectionService { @Inject(POOL_INJECTION_BATCH_REPOSITORY) private readonly batchRepository: IPoolInjectionBatchRepository, private readonly walletService: WalletServiceClient, + private readonly outboxRepository: OutboxRepository, ) {} /** @@ -80,7 +83,11 @@ export class PoolInjectionService { // 更新批次内所有订单状态 const orders = await this.orderRepository.findByBatchId(batchId); + this.logger.log(`[BATCH] Processing ${orders.length} orders in batch ${batch.batchNo}`); + for (const order of orders) { + this.logger.debug(`[BATCH] Processing order ${order.orderNo}`); + order.confirmPoolInjection(result.txHash); await this.orderRepository.save(order); @@ -96,10 +103,13 @@ export class PoolInjectionService { position.activateMining(order.treeCount.value); await this.positionRepository.save(position); } + + // 保存 MiningEnabled 事件到 Outbox(使用 Outbox Pattern) + await this.saveMiningEnabledEventsToOutbox(order); } this.logger.log( - `Batch ${batch.batchNo} processed successfully, txHash: ${result.txHash}`, + `[BATCH] ✓ Batch ${batch.batchNo} processed successfully, txHash: ${result.txHash}, orders: ${orders.length}`, ); } catch (error) { this.logger.error(`Failed to inject batch ${batch.batchNo}`, error); @@ -160,4 +170,44 @@ export class PoolInjectionService { injectionTxHash: batch.injectionTxHash, }; } + + /** + * 保存挖矿开启事件到 Outbox + * + * 使用 Outbox Pattern 保证事件发布的可靠性 + * 事件由 OutboxPublisherService 轮询发布到 Kafka + */ + private async saveMiningEnabledEventsToOutbox(order: PlantingOrder): Promise { + this.logger.debug(`[OUTBOX] Saving MiningEnabled events to outbox for order ${order.orderNo}`); + + const outboxEvents: OutboxEventData[] = []; + + // 将领域事件转换为 Outbox 事件 + for (const domainEvent of order.domainEvents) { + outboxEvents.push({ + eventType: domainEvent.type, + topic: 'planting-events', + key: order.userId.toString(), + payload: { + eventName: domainEvent.type, + aggregateId: domainEvent.aggregateId, + occurredAt: domainEvent.occurredAt, + data: domainEvent.data, + }, + aggregateId: order.orderNo, + aggregateType: 'PlantingOrder', + }); + } + + if (outboxEvents.length > 0) { + // 使用 PrismaService 事务保存(这里简化处理,直接保存) + // 注意:理想情况下应该在 processBatch 的事务中一起保存 + await this.outboxRepository.saveEvents(outboxEvents); + + this.logger.log(`[OUTBOX] ✓ Saved ${outboxEvents.length} MiningEnabled events to outbox for order ${order.orderNo}`); + } + + // 清除领域事件 + order.clearDomainEvents(); + } } diff --git a/backend/services/planting-service/src/infrastructure/infrastructure.module.ts b/backend/services/planting-service/src/infrastructure/infrastructure.module.ts index f5367365..0431d928 100644 --- a/backend/services/planting-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/planting-service/src/infrastructure/infrastructure.module.ts @@ -4,9 +4,13 @@ import { PrismaService } from './persistence/prisma/prisma.service'; import { PlantingOrderRepositoryImpl } from './persistence/repositories/planting-order.repository.impl'; import { PlantingPositionRepositoryImpl } from './persistence/repositories/planting-position.repository.impl'; import { PoolInjectionBatchRepositoryImpl } from './persistence/repositories/pool-injection-batch.repository.impl'; +import { OutboxRepository } from './persistence/repositories/outbox.repository'; import { UnitOfWork, UNIT_OF_WORK } from './persistence/unit-of-work'; import { WalletServiceClient } from './external/wallet-service.client'; import { ReferralServiceClient } from './external/referral-service.client'; +import { KafkaModule } from './kafka/kafka.module'; +import { OutboxPublisherService } from './kafka/outbox-publisher.service'; +import { EventAckController } from './kafka/event-ack.controller'; import { PLANTING_ORDER_REPOSITORY } from '../domain/repositories/planting-order.repository.interface'; import { PLANTING_POSITION_REPOSITORY } from '../domain/repositories/planting-position.repository.interface'; import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-injection-batch.repository.interface'; @@ -18,7 +22,9 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj timeout: 5000, maxRedirects: 5, }), + KafkaModule, ], + controllers: [EventAckController], providers: [ PrismaService, { @@ -37,6 +43,8 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj provide: UNIT_OF_WORK, useClass: UnitOfWork, }, + OutboxRepository, + OutboxPublisherService, WalletServiceClient, ReferralServiceClient, ], @@ -46,6 +54,8 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj PLANTING_POSITION_REPOSITORY, POOL_INJECTION_BATCH_REPOSITORY, UNIT_OF_WORK, + OutboxRepository, + OutboxPublisherService, WalletServiceClient, ReferralServiceClient, ], diff --git a/backend/services/planting-service/src/infrastructure/kafka/event-ack.consumer.ts b/backend/services/planting-service/src/infrastructure/kafka/event-ack.consumer.ts new file mode 100644 index 00000000..c260fab1 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/event-ack.consumer.ts @@ -0,0 +1,99 @@ +import { Injectable, Logger, OnModuleInit, Inject } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { OutboxRepository } from '../persistence/repositories/outbox.repository'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认消费者 + * + * B方案核心组件:监听消费方的确认事件 + * 当消费方(reward-service, referral-service, authorization-service) + * 成功处理事件后,会发送确认消息到 planting.events.ack topic + * + * 工作流程: + * 1. planting-service 发送事件到 Kafka,标记为 SENT + * 2. 消费方处理事件成功后,发送确认到 planting.events.ack + * 3. 本消费者收到确认,将事件标记为 CONFIRMED + * 4. 超时未收到确认的事件会被重发 + */ +@Injectable() +export class EventAckConsumer implements OnModuleInit { + private readonly logger = new Logger(EventAckConsumer.name); + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + private readonly outboxRepository: OutboxRepository, + ) {} + + async onModuleInit() { + // 订阅确认 topic + this.kafkaClient.subscribeToResponseOf('planting.events.ack'); + + this.logger.log('[ACK-CONSUMER] Subscribed to planting.events.ack topic'); + + // 连接后开始消费 + try { + await this.kafkaClient.connect(); + this.startConsuming(); + } catch (error) { + this.logger.error('[ACK-CONSUMER] Failed to connect to Kafka:', error); + } + } + + private startConsuming() { + // 使用 ClientKafka 的 emit 来订阅消息 + // 注意:NestJS 的 ClientKafka 主要用于生产者,消费者通常使用 @MessagePattern + // 这里我们使用轮询方式或者通过 Controller 来处理 + this.logger.log('[ACK-CONSUMER] Event acknowledgment consumer started'); + } + + /** + * 处理确认消息 + * 此方法由 Kafka Controller 调用 + */ + async handleAckMessage(message: EventAckMessage): Promise { + this.logger.debug(`[ACK-CONSUMER] Received ack: ${JSON.stringify(message)}`); + + try { + if (message.success) { + // 标记事件为已确认 + const confirmed = await this.outboxRepository.markAsConfirmed(message.eventId); + + if (confirmed) { + this.logger.log( + `[ACK-CONSUMER] ✓ Event ${message.eventId} confirmed by ${message.consumerService}`, + ); + } + } else { + // 消费方处理失败,记录错误但不改变状态 + // 超时后会自动重发 + this.logger.warn( + `[ACK-CONSUMER] ✗ Event ${message.eventId} failed in ${message.consumerService}: ${message.error}`, + ); + } + } catch (error) { + this.logger.error( + `[ACK-CONSUMER] Error processing ack for event ${message.eventId}:`, + error, + ); + } + } +} diff --git a/backend/services/planting-service/src/infrastructure/kafka/event-ack.controller.ts b/backend/services/planting-service/src/infrastructure/kafka/event-ack.controller.ts new file mode 100644 index 00000000..991bba09 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/event-ack.controller.ts @@ -0,0 +1,82 @@ +import { Controller, Logger } from '@nestjs/common'; +import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices'; +import { OutboxRepository } from '../persistence/repositories/outbox.repository'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认 Kafka 控制器 + * + * B方案核心组件:监听消费方的确认事件 + * 使用 @MessagePattern 装饰器来处理 Kafka 消息 + */ +@Controller() +export class EventAckController { + private readonly logger = new Logger(EventAckController.name); + + constructor(private readonly outboxRepository: OutboxRepository) {} + + /** + * 处理事件确认消息 + * + * 消费方(reward-service, referral-service, authorization-service) + * 成功处理事件后,会发送确认消息到此 topic + */ + @MessagePattern('planting.events.ack') + async handleEventAck( + @Payload() message: EventAckMessage, + @Ctx() context: KafkaContext, + ): Promise { + const partition = context.getPartition(); + const offset = context.getMessage().offset; + + this.logger.debug( + `[ACK] Received ack from ${message.consumerService} for event ${message.eventId} ` + + `[partition=${partition}, offset=${offset}]`, + ); + + try { + if (message.success) { + // 标记事件为已确认 + const confirmed = await this.outboxRepository.markAsConfirmed(message.eventId); + + if (confirmed) { + this.logger.log( + `[ACK] ✓ Event ${message.eventId} (${message.eventType}) confirmed by ${message.consumerService}`, + ); + } else { + this.logger.warn( + `[ACK] Event ${message.eventId} not found or already confirmed`, + ); + } + } else { + // 消费方处理失败 + this.logger.warn( + `[ACK] ✗ Event ${message.eventId} failed in ${message.consumerService}: ${message.error}`, + ); + // 不改变状态,等待超时重发 + } + } catch (error) { + this.logger.error( + `[ACK] Error processing ack for event ${message.eventId}:`, + error, + ); + } + } +} diff --git a/backend/services/planting-service/src/infrastructure/kafka/event-publisher.service.ts b/backend/services/planting-service/src/infrastructure/kafka/event-publisher.service.ts new file mode 100644 index 00000000..9a511f2b --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/event-publisher.service.ts @@ -0,0 +1,245 @@ +import { Injectable, Inject, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { ConfigService } from '@nestjs/config'; +import { DomainEvent } from '../../domain/events/domain-event.interface'; + +/** + * Kafka Topic 映射 + * + * 消费者: + * - reward-service: 监听 planting.order.paid + * - authorization-service: 监听 planting-events + * - referral-service: 监听 planting.planting.created + */ +const EVENT_TOPIC_MAP: Record = { + PlantingOrderCreated: 'planting.order.created', + ProvinceCityConfirmed: 'planting.order.province-city-confirmed', + PlantingOrderPaid: 'planting.order.paid', + FundsAllocated: 'planting.order.funds-allocated', + PoolInjected: 'planting.pool.injected', + MiningEnabled: 'planting.mining.enabled', +}; + +@Injectable() +export class EventPublisherService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(EventPublisherService.name); + private isConnected = false; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + private readonly configService: ConfigService, + ) { + const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092'); + this.logger.log(`[INIT] EventPublisherService created, brokers: ${brokers}`); + } + + async onModuleInit() { + this.logger.log('[KAFKA] Attempting to connect to Kafka...'); + try { + await this.kafkaClient.connect(); + this.isConnected = true; + this.logger.log('[KAFKA] Successfully connected to Kafka broker'); + } catch (error) { + this.logger.error('[KAFKA] Failed to connect to Kafka:', error); + // 不抛出错误,允许服务启动(开发环境可能没有 Kafka) + this.logger.warn('[KAFKA] Service will start without Kafka connection'); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + this.logger.log('[KAFKA] Disconnecting from Kafka...'); + await this.kafkaClient.close(); + this.logger.log('[KAFKA] Disconnected from Kafka'); + } + } + + /** + * 发布单个领域事件 + */ + async publish(event: DomainEvent): Promise { + const topic = this.getTopicForEvent(event); + const message = this.formatMessage(event); + + this.logger.debug(`[PUBLISH] Preparing to publish event: + - Type: ${event.type} + - Topic: ${topic} + - AggregateId: ${event.aggregateId} + - Data: ${JSON.stringify(event.data)}`); + + if (!this.isConnected) { + this.logger.warn(`[PUBLISH] Kafka not connected, skipping event ${event.type}`); + return; + } + + try { + this.kafkaClient.emit(topic, message); + this.logger.log(`[PUBLISH] ✓ Event ${event.type} published to topic ${topic}`); + } catch (error) { + this.logger.error(`[PUBLISH] ✗ Failed to publish event ${event.type}:`, error); + throw error; + } + } + + /** + * 批量发布领域事件 + */ + async publishAll(events: readonly DomainEvent[]): Promise { + this.logger.log(`[PUBLISH] Publishing ${events.length} events...`); + for (const event of events) { + await this.publish(event); + } + this.logger.log(`[PUBLISH] Finished publishing ${events.length} events`); + } + + /** + * 发布到通用 planting-events topic (供 authorization-service 消费) + */ + async publishToPlantingEvents(event: DomainEvent): Promise { + const eventType = this.mapEventTypeToPlantingEventType(event.type); + const message = { + key: event.aggregateId, + value: JSON.stringify({ + eventType, + payload: event.data, + occurredAt: event.occurredAt.toISOString(), + }), + }; + + this.logger.debug(`[PUBLISH] Preparing planting-events message: + - Original Type: ${event.type} + - Mapped Type: ${eventType} + - Payload: ${JSON.stringify(event.data)}`); + + if (!this.isConnected) { + this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting-events for ${event.type}`); + return; + } + + try { + this.kafkaClient.emit('planting-events', message); + this.logger.log(`[PUBLISH] ✓ Event ${event.type} published to topic planting-events`); + } catch (error) { + this.logger.error(`[PUBLISH] ✗ Failed to publish to planting-events:`, error); + } + } + + /** + * 发布认种支付事件 (reward-service 消费格式) + */ + async publishPlantingOrderPaid(params: { + orderId: string; + userId: string; + treeCount: number; + provinceCode: string; + cityCode: string; + paidAt: Date; + }): Promise { + const topic = 'planting.order.paid'; + const message = { + key: params.orderId, + value: JSON.stringify({ + orderId: params.orderId, + userId: params.userId, + treeCount: params.treeCount, + provinceCode: params.provinceCode, + cityCode: params.cityCode, + paidAt: params.paidAt.toISOString(), + }), + }; + + this.logger.debug(`[PUBLISH] Publishing PlantingOrderPaid for reward-service: + - OrderId: ${params.orderId} + - UserId: ${params.userId} + - TreeCount: ${params.treeCount} + - Province: ${params.provinceCode} + - City: ${params.cityCode}`); + + if (!this.isConnected) { + this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting.order.paid`); + return; + } + + try { + this.kafkaClient.emit(topic, message); + this.logger.log(`[PUBLISH] ✓ PlantingOrderPaid published for order ${params.orderId}`); + } catch (error) { + this.logger.error(`[PUBLISH] ✗ Failed to publish PlantingOrderPaid:`, error); + } + } + + /** + * 发布认种创建事件 (referral-service 消费格式) + */ + async publishPlantingCreated(params: { + userId: string; + treeCount: number; + provinceCode: string; + cityCode: string; + }): Promise { + const topic = 'planting.planting.created'; + const message = { + key: params.userId, + value: JSON.stringify({ + eventName: 'planting.created', + data: { + userId: params.userId, + treeCount: params.treeCount, + provinceCode: params.provinceCode, + cityCode: params.cityCode, + }, + }), + }; + + this.logger.debug(`[PUBLISH] Publishing PlantingCreated for referral-service: + - UserId: ${params.userId} + - TreeCount: ${params.treeCount}`); + + if (!this.isConnected) { + this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting.planting.created`); + return; + } + + try { + this.kafkaClient.emit(topic, message); + this.logger.log(`[PUBLISH] ✓ PlantingCreated published for user ${params.userId}`); + } catch (error) { + this.logger.error(`[PUBLISH] ✗ Failed to publish PlantingCreated:`, error); + } + } + + private getTopicForEvent(event: DomainEvent): string { + const topic = EVENT_TOPIC_MAP[event.type] || 'planting.events'; + this.logger.debug(`[TOPIC] Mapped event type ${event.type} to topic ${topic}`); + return topic; + } + + private formatMessage(event: DomainEvent) { + return { + key: event.aggregateId, + value: JSON.stringify({ + eventId: `${event.aggregateId}-${event.occurredAt.getTime()}`, + eventType: event.type, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + occurredAt: event.occurredAt.toISOString(), + data: event.data, + }), + }; + } + + /** + * 映射到 authorization-service 期望的事件类型格式 + */ + private mapEventTypeToPlantingEventType(eventType: string): string { + switch (eventType) { + case 'PlantingOrderCreated': + return 'planting.tree.planted'; + case 'PlantingOrderPaid': + return 'planting.tree.planted'; + default: + return `planting.${eventType.toLowerCase()}`; + } + } +} diff --git a/backend/services/planting-service/src/infrastructure/kafka/index.ts b/backend/services/planting-service/src/infrastructure/kafka/index.ts new file mode 100644 index 00000000..0924b576 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/index.ts @@ -0,0 +1,2 @@ +export * from './kafka.module'; +export * from './event-publisher.service'; diff --git a/backend/services/planting-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/planting-service/src/infrastructure/kafka/kafka.module.ts new file mode 100644 index 00000000..c6e319cf --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/kafka.module.ts @@ -0,0 +1,32 @@ +import { Module, Global } from '@nestjs/common'; +import { ConfigModule, ConfigService } from '@nestjs/config'; +import { ClientsModule, Transport } from '@nestjs/microservices'; +import { EventPublisherService } from './event-publisher.service'; + +@Global() +@Module({ + imports: [ + ClientsModule.registerAsync([ + { + name: 'KAFKA_SERVICE', + imports: [ConfigModule], + useFactory: (configService: ConfigService) => ({ + transport: Transport.KAFKA, + options: { + client: { + clientId: configService.get('KAFKA_CLIENT_ID', 'planting-service'), + brokers: configService.get('KAFKA_BROKERS', 'localhost:9092').split(','), + }, + producer: { + allowAutoTopicCreation: true, + }, + }, + }), + inject: [ConfigService], + }, + ]), + ], + providers: [EventPublisherService], + exports: [EventPublisherService, ClientsModule], +}) +export class KafkaModule {} diff --git a/backend/services/planting-service/src/infrastructure/kafka/outbox-publisher.service.ts b/backend/services/planting-service/src/infrastructure/kafka/outbox-publisher.service.ts new file mode 100644 index 00000000..8f7ca5a7 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/kafka/outbox-publisher.service.ts @@ -0,0 +1,280 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy, Inject } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { ClientKafka } from '@nestjs/microservices'; +import { OutboxRepository, OutboxEvent, OutboxStatus } from '../persistence/repositories/outbox.repository'; + +/** + * Outbox Publisher Service (B方案 - 消费方确认模式) + * + * 轮询 Outbox 表并发布事件到 Kafka + * 使用消费方确认机制保证事件100%被处理 + * + * 工作流程: + * 1. 轮询 PENDING 状态的事件 + * 2. 发送到 Kafka,标记为 SENT(等待确认) + * 3. 消费方处理成功后发送确认到 planting.events.ack + * 4. 收到确认后标记为 CONFIRMED + * 5. 超时未确认的事件重置为 PENDING 重发 + */ +@Injectable() +export class OutboxPublisherService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(OutboxPublisherService.name); + private isRunning = false; + private pollInterval: NodeJS.Timeout | null = null; + private timeoutCheckInterval: NodeJS.Timeout | null = null; + private cleanupInterval: NodeJS.Timeout | null = null; + private isConnected = false; + + // 配置 + private readonly pollIntervalMs: number; + private readonly batchSize: number; + private readonly cleanupIntervalMs: number; + private readonly confirmationTimeoutMinutes: number; + private readonly timeoutCheckIntervalMs: number; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + private readonly outboxRepository: OutboxRepository, + private readonly configService: ConfigService, + ) { + this.pollIntervalMs = this.configService.get('OUTBOX_POLL_INTERVAL_MS', 1000); + this.batchSize = this.configService.get('OUTBOX_BATCH_SIZE', 100); + this.cleanupIntervalMs = this.configService.get('OUTBOX_CLEANUP_INTERVAL_MS', 3600000); // 1小时 + this.confirmationTimeoutMinutes = this.configService.get('OUTBOX_CONFIRMATION_TIMEOUT_MINUTES', 5); + this.timeoutCheckIntervalMs = this.configService.get('OUTBOX_TIMEOUT_CHECK_INTERVAL_MS', 60000); // 1分钟 + + this.logger.log( + `[OUTBOX] OutboxPublisher (B方案) configured: ` + + `pollInterval=${this.pollIntervalMs}ms, batchSize=${this.batchSize}, ` + + `confirmationTimeout=${this.confirmationTimeoutMinutes}min`, + ); + } + + async onModuleInit() { + this.logger.log('[OUTBOX] Connecting to Kafka...'); + try { + await this.kafkaClient.connect(); + this.isConnected = true; + this.logger.log('[OUTBOX] Connected to Kafka'); + this.start(); + } catch (error) { + this.logger.error('[OUTBOX] Failed to connect to Kafka:', error); + this.logger.warn('[OUTBOX] OutboxPublisher will not start - events will accumulate in outbox table'); + } + } + + async onModuleDestroy() { + this.stop(); + if (this.isConnected) { + await this.kafkaClient.close(); + } + } + + /** + * 启动轮询 + */ + start(): void { + if (this.isRunning) { + this.logger.warn('[OUTBOX] Publisher already running'); + return; + } + + this.isRunning = true; + this.logger.log('[OUTBOX] Starting outbox publisher (B方案)...'); + + // 启动发布轮询 + this.pollInterval = setInterval(() => { + this.processOutbox().catch((err) => { + this.logger.error('[OUTBOX] Error processing outbox:', err); + }); + }, this.pollIntervalMs); + + // 启动超时检查任务(B方案核心) + this.timeoutCheckInterval = setInterval(() => { + this.checkConfirmationTimeouts().catch((err) => { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', err); + }); + }, this.timeoutCheckIntervalMs); + + // 启动清理任务 + this.cleanupInterval = setInterval(() => { + this.cleanup().catch((err) => { + this.logger.error('[OUTBOX] Error cleaning up outbox:', err); + }); + }, this.cleanupIntervalMs); + + this.logger.log('[OUTBOX] Outbox publisher started (B方案 - 消费方确认模式)'); + } + + /** + * 停止轮询 + */ + stop(): void { + if (!this.isRunning) return; + + this.isRunning = false; + + if (this.pollInterval) { + clearInterval(this.pollInterval); + this.pollInterval = null; + } + + if (this.timeoutCheckInterval) { + clearInterval(this.timeoutCheckInterval); + this.timeoutCheckInterval = null; + } + + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + + this.logger.log('[OUTBOX] Outbox publisher stopped'); + } + + /** + * 处理 Outbox 事件 + */ + async processOutbox(): Promise { + if (!this.isConnected) { + return; + } + + try { + // 1. 获取待发布事件 + const pendingEvents = await this.outboxRepository.findPendingEvents(this.batchSize); + + // 2. 获取需要重试的事件 + const retryEvents = await this.outboxRepository.findEventsForRetry(this.batchSize / 2); + + const allEvents = [...pendingEvents, ...retryEvents]; + + if (allEvents.length === 0) { + return; + } + + this.logger.debug(`[OUTBOX] Processing ${allEvents.length} events (${pendingEvents.length} pending, ${retryEvents.length} retry)`); + + // 3. 逐个发布 + for (const event of allEvents) { + await this.publishEvent(event); + } + } catch (error) { + this.logger.error('[OUTBOX] Error in processOutbox:', error); + } + } + + /** + * 发布单个事件 (B方案) + * + * 使用 emit() 发送到 Kafka,成功后标记为 SENT(等待消费方确认) + * 只有收到消费方确认后才标记为 CONFIRMED + */ + private async publishEvent(event: OutboxEvent): Promise { + try { + this.logger.debug(`[OUTBOX] Publishing event ${event.id} to topic ${event.topic}`); + + // 构造 Kafka 消息,包含 outboxId 用于确认 + const payload = { + ...(event.payload as Record), + _outbox: { + id: event.id.toString(), + aggregateId: event.aggregateId, + eventType: event.eventType, + }, + }; + + const message = { + key: event.key, + value: JSON.stringify(payload), + }; + + // 发布到 Kafka + this.kafkaClient.emit(event.topic, message); + + // B方案:标记为 SENT(等待消费方确认) + await this.outboxRepository.markAsSent(event.id); + + this.logger.log( + `[OUTBOX] → Event ${event.id} sent to ${event.topic} (awaiting consumer confirmation)`, + ); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`[OUTBOX] ✗ Failed to publish event ${event.id}: ${errorMessage}`); + + // 标记为失败并安排重试 + await this.outboxRepository.markAsFailed(event.id, errorMessage); + } + } + + /** + * 检查确认超时的事件 (B方案核心) + * + * 将超时未确认的 SENT 事件重置为 PENDING 以便重发 + */ + private async checkConfirmationTimeouts(): Promise { + if (!this.isConnected) { + return; + } + + try { + const timedOutEvents = await this.outboxRepository.findSentEventsTimedOut( + this.confirmationTimeoutMinutes, + this.batchSize, + ); + + if (timedOutEvents.length === 0) { + return; + } + + this.logger.warn( + `[OUTBOX] Found ${timedOutEvents.length} events without confirmation after ${this.confirmationTimeoutMinutes} minutes`, + ); + + for (const event of timedOutEvents) { + await this.outboxRepository.resetSentToPending(event.id); + this.logger.warn( + `[OUTBOX] Event ${event.id} reset to PENDING for retry (retry #${event.retryCount + 1})`, + ); + } + } catch (error) { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', error); + } + } + + /** + * 清理旧事件 + */ + private async cleanup(): Promise { + const retentionDays = this.configService.get('OUTBOX_RETENTION_DAYS', 7); + await this.outboxRepository.cleanupOldEvents(retentionDays); + } + + /** + * 手动触发处理(用于测试或紧急情况) + */ + async triggerProcessing(): Promise { + this.logger.log('[OUTBOX] Manual processing triggered'); + await this.processOutbox(); + } + + /** + * 获取统计信息 + */ + async getStats(): Promise<{ + isRunning: boolean; + isConnected: boolean; + pending: number; + sent: number; + confirmed: number; + failed: number; + }> { + const stats = await this.outboxRepository.getStats(); + return { + isRunning: this.isRunning, + isConnected: this.isConnected, + ...stats, + }; + } +} diff --git a/backend/services/planting-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/planting-service/src/infrastructure/persistence/repositories/outbox.repository.ts new file mode 100644 index 00000000..5d17dee0 --- /dev/null +++ b/backend/services/planting-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -0,0 +1,364 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { Prisma } from '@prisma/client'; + +export enum OutboxStatus { + PENDING = 'PENDING', // 待发送 + SENT = 'SENT', // 已发送到 Kafka,等待消费方确认 + CONFIRMED = 'CONFIRMED', // 消费方已确认处理成功 + FAILED = 'FAILED', // 发送失败,等待重试 +} + +export interface OutboxEventData { + eventType: string; + topic: string; + key: string; + payload: Record; + aggregateId: string; + aggregateType: string; +} + +export interface OutboxEvent extends OutboxEventData { + id: bigint; + status: OutboxStatus; + retryCount: number; + maxRetries: number; + lastError: string | null; + createdAt: Date; + publishedAt: Date | null; + nextRetryAt: Date | null; +} + +@Injectable() +export class OutboxRepository { + private readonly logger = new Logger(OutboxRepository.name); + + constructor(private readonly prisma: PrismaService) {} + + /** + * 在事务中保存 Outbox 事件 + */ + async saveInTransaction( + tx: Prisma.TransactionClient, + events: OutboxEventData[], + ): Promise { + if (events.length === 0) return; + + this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox (in transaction)`); + + await tx.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }); + + this.logger.debug(`[OUTBOX] Saved ${events.length} events to outbox`); + } + + /** + * 直接保存 Outbox 事件(不在事务中) + */ + async saveEvents(events: OutboxEventData[]): Promise { + if (events.length === 0) return; + + this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox`); + + await this.prisma.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }); + + this.logger.log(`[OUTBOX] ✓ Saved ${events.length} events to outbox`); + } + + /** + * 获取待发布的事件(按创建时间排序) + */ + async findPendingEvents(limit: number = 100): Promise { + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.PENDING, + }, + orderBy: { + createdAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 获取需要重试的事件 + */ + async findEventsForRetry(limit: number = 50): Promise { + const now = new Date(); + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.FAILED, + retryCount: { + lt: this.prisma.outboxEvent.fields.maxRetries, + }, + nextRetryAt: { + lte: now, + }, + }, + orderBy: { + nextRetryAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 标记事件为已发送(等待消费方确认) + * B方案:发送到 Kafka 后先标记为 SENT,等消费方确认后才标记为 CONFIRMED + */ + async markAsSent(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.SENT, + publishedAt: new Date(), + }, + }); + + this.logger.debug(`[OUTBOX] Marked event ${id} as SENT (awaiting consumer confirmation)`); + } + + /** + * 标记事件为已确认(消费方已成功处理) + * B方案:收到消费方确认后调用 + */ + async markAsConfirmed(eventId: string): Promise { + // 通过 aggregateId + eventType 查找事件 + const result = await this.prisma.outboxEvent.updateMany({ + where: { + aggregateId: eventId, + status: OutboxStatus.SENT, + }, + data: { + status: OutboxStatus.CONFIRMED, + }, + }); + + if (result.count > 0) { + this.logger.log(`[OUTBOX] ✓ Event ${eventId} confirmed by consumer`); + return true; + } + + this.logger.warn(`[OUTBOX] Event ${eventId} not found or not in SENT status`); + return false; + } + + /** + * 通过 outbox ID 标记为已确认 + */ + async markAsConfirmedById(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.CONFIRMED, + }, + }); + + this.logger.log(`[OUTBOX] ✓ Event ${id} confirmed by consumer`); + } + + /** + * 获取已发送但未确认且超时的事件(用于重试) + * B方案:超时未收到确认的事件需要重发 + */ + async findSentEventsTimedOut(timeoutMinutes: number = 5, limit: number = 50): Promise { + const cutoffTime = new Date(); + cutoffTime.setMinutes(cutoffTime.getMinutes() - timeoutMinutes); + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.SENT, + publishedAt: { + lt: cutoffTime, + }, + retryCount: { + lt: 5, // 最多重试5次 + }, + }, + orderBy: { + publishedAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 将超时的 SENT 事件重置为 PENDING 以便重发 + */ + async resetSentToPending(id: bigint): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + + if (!event) return; + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.PENDING, + retryCount: event.retryCount + 1, + lastError: 'Consumer confirmation timeout', + }, + }); + + this.logger.warn(`[OUTBOX] Event ${id} reset to PENDING for retry (confirmation timeout)`); + } + + /** + * 标记事件为已发布(旧方法,保留兼容性) + * @deprecated 使用 markAsSent 和 markAsConfirmed 代替 + */ + async markAsPublished(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.CONFIRMED, + publishedAt: new Date(), + }, + }); + + this.logger.debug(`[OUTBOX] Marked event ${id} as published (confirmed)`); + } + + /** + * 标记事件为失败并安排重试 + */ + async markAsFailed(id: bigint, error: string): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + + if (!event) return; + + const newRetryCount = event.retryCount + 1; + const isFinalFailure = newRetryCount >= event.maxRetries; + + // 指数退避:1min, 2min, 4min, 8min, 16min + const delayMinutes = Math.pow(2, newRetryCount - 1); + const nextRetryAt = new Date(); + nextRetryAt.setMinutes(nextRetryAt.getMinutes() + delayMinutes); + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: isFinalFailure ? OutboxStatus.FAILED : OutboxStatus.FAILED, + retryCount: newRetryCount, + lastError: error, + nextRetryAt: isFinalFailure ? null : nextRetryAt, + }, + }); + + if (isFinalFailure) { + this.logger.error(`[OUTBOX] Event ${id} permanently failed after ${newRetryCount} retries: ${error}`); + } else { + this.logger.warn(`[OUTBOX] Event ${id} failed, retry ${newRetryCount}/${event.maxRetries}, next retry at ${nextRetryAt}`); + } + } + + /** + * 批量标记为已确认 + * @deprecated 使用单个 markAsConfirmedById 代替 + */ + async markManyAsPublished(ids: bigint[]): Promise { + if (ids.length === 0) return; + + await this.prisma.outboxEvent.updateMany({ + where: { + id: { in: ids }, + }, + data: { + status: OutboxStatus.CONFIRMED, + publishedAt: new Date(), + }, + }); + + this.logger.debug(`[OUTBOX] Marked ${ids.length} events as confirmed`); + } + + /** + * 清理已确认的旧事件(保留最近7天) + */ + async cleanupOldEvents(retentionDays: number = 7): Promise { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - retentionDays); + + const result = await this.prisma.outboxEvent.deleteMany({ + where: { + status: OutboxStatus.CONFIRMED, + publishedAt: { + lt: cutoffDate, + }, + }, + }); + + if (result.count > 0) { + this.logger.log(`[OUTBOX] Cleaned up ${result.count} old events`); + } + + return result.count; + } + + /** + * 获取 Outbox 统计信息 + */ + async getStats(): Promise<{ + pending: number; + sent: number; + confirmed: number; + failed: number; + }> { + const [pending, sent, confirmed, failed] = await Promise.all([ + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.PENDING } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.SENT } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.CONFIRMED } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.FAILED } }), + ]); + + return { pending, sent, confirmed, failed }; + } + + private mapToOutboxEvent(record: any): OutboxEvent { + return { + id: record.id, + eventType: record.eventType, + topic: record.topic, + key: record.key, + payload: record.payload as Record, + aggregateId: record.aggregateId, + aggregateType: record.aggregateType, + status: record.status as OutboxStatus, + retryCount: record.retryCount, + maxRetries: record.maxRetries, + lastError: record.lastError, + createdAt: record.createdAt, + publishedAt: record.publishedAt, + nextRetryAt: record.nextRetryAt, + }; + } +} diff --git a/backend/services/planting-service/src/infrastructure/persistence/unit-of-work.ts b/backend/services/planting-service/src/infrastructure/persistence/unit-of-work.ts index 197f39b9..4566fbe7 100644 --- a/backend/services/planting-service/src/infrastructure/persistence/unit-of-work.ts +++ b/backend/services/planting-service/src/infrastructure/persistence/unit-of-work.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { Prisma } from '@prisma/client'; import { PrismaService, TransactionClient } from './prisma/prisma.service'; import { PlantingOrder } from '../../domain/aggregates/planting-order.aggregate'; @@ -7,6 +7,7 @@ import { PoolInjectionBatch } from '../../domain/aggregates/pool-injection-batch import { PlantingOrderMapper } from './mappers/planting-order.mapper'; import { PlantingPositionMapper } from './mappers/planting-position.mapper'; import { PoolInjectionBatchMapper } from './mappers/pool-injection-batch.mapper'; +import { OutboxEventData } from './repositories/outbox.repository'; /** * 工作单元 - 用于管理跨多个聚合根的数据库事务 @@ -45,10 +46,70 @@ export class UnitOfWork { /** * 事务性工作单元 - 在事务上下文中提供聚合根的持久化操作 + * + * 支持 Outbox Pattern:在同一个事务中保存业务数据和事件数据 + * 保证事件发布的原子性和可靠性 */ export class TransactionalUnitOfWork { + private readonly logger = new Logger(TransactionalUnitOfWork.name); + private pendingOutboxEvents: OutboxEventData[] = []; + constructor(private readonly tx: TransactionClient) {} + /** + * 添加 Outbox 事件(在事务提交时一起保存) + * + * @param event 事件数据 + */ + addOutboxEvent(event: OutboxEventData): void { + this.logger.debug(`[OUTBOX] Adding event to outbox: ${event.eventType} for ${event.aggregateType}:${event.aggregateId}`); + this.pendingOutboxEvents.push(event); + } + + /** + * 批量添加 Outbox 事件 + * + * @param events 事件数据数组 + */ + addOutboxEvents(events: OutboxEventData[]): void { + this.logger.debug(`[OUTBOX] Adding ${events.length} events to outbox`); + this.pendingOutboxEvents.push(...events); + } + + /** + * 保存所有待发送的 Outbox 事件 + * 应在事务的最后调用 + */ + async commitOutboxEvents(): Promise { + if (this.pendingOutboxEvents.length === 0) { + return; + } + + this.logger.debug(`[OUTBOX] Committing ${this.pendingOutboxEvents.length} outbox events`); + + await this.tx.outboxEvent.createMany({ + data: this.pendingOutboxEvents.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: 'PENDING', + })), + }); + + this.logger.log(`[OUTBOX] ✓ Committed ${this.pendingOutboxEvents.length} outbox events`); + this.pendingOutboxEvents = []; + } + + /** + * 获取待发送的事件数量 + */ + getPendingOutboxEventCount(): number { + return this.pendingOutboxEvents.length; + } + /** * 保存认种订单 */ diff --git a/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts b/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts index 863f4a09..a2bd4ab8 100644 --- a/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts +++ b/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts @@ -1,5 +1,6 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { KafkaService } from '../../infrastructure'; +import { EventAckPublisher } from '../../infrastructure/kafka/event-ack.publisher'; import { TeamStatisticsService } from '../services'; import { UpdateTeamStatisticsCommand } from '../commands'; @@ -11,6 +12,11 @@ interface PlantingCreatedEvent { provinceCode: string; cityCode: string; }; + _outbox?: { + id: string; + aggregateId: string; + eventType: string; + }; } /** @@ -24,6 +30,7 @@ export class PlantingCreatedHandler implements OnModuleInit { constructor( private readonly kafkaService: KafkaService, private readonly teamStatisticsService: TeamStatisticsService, + private readonly eventAckPublisher: EventAckPublisher, ) {} async onModuleInit() { @@ -42,6 +49,10 @@ export class PlantingCreatedHandler implements OnModuleInit { return; } + // B方案:提取 outbox 信息用于发送确认 + const outboxInfo = event._outbox; + const eventId = outboxInfo?.aggregateId || 'unknown'; + try { const command = new UpdateTeamStatisticsCommand( BigInt(event.data.userId), @@ -54,11 +65,22 @@ export class PlantingCreatedHandler implements OnModuleInit { this.logger.log( `Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`, ); + + // B方案:发送处理成功确认 + if (outboxInfo) { + await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType); + } } catch (error) { this.logger.error( `Failed to update team statistics for user ${event.data.userId}:`, error, ); + + // B方案:发送处理失败确认 + if (outboxInfo) { + const errorMessage = error instanceof Error ? error.message : String(error); + await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage); + } } } } diff --git a/backend/services/referral-service/src/infrastructure/kafka/event-ack.publisher.ts b/backend/services/referral-service/src/infrastructure/kafka/event-ack.publisher.ts new file mode 100644 index 00000000..378697e4 --- /dev/null +++ b/backend/services/referral-service/src/infrastructure/kafka/event-ack.publisher.ts @@ -0,0 +1,98 @@ +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认发布器 + * + * B方案核心组件:消费方处理事件后发送确认 + * 发送确认消息到 planting.events.ack topic + */ +@Injectable() +export class EventAckPublisher { + private readonly logger = new Logger(EventAckPublisher.name); + private readonly serviceName = 'referral-service'; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + ) {} + + /** + * 发送处理成功确认 + */ + async sendSuccess(eventId: string, eventType: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: true, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`); + } catch (error) { + this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error); + } + } + + /** + * 发送处理失败确认 + */ + async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: false, + error: errorMessage, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`); + } catch (error) { + this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error); + } + } + + /** + * 从消息中提取 outbox 信息 + */ + extractOutboxInfo(message: Record): { id: string; aggregateId: string; eventType: string } | null { + const outbox = message._outbox as { id: string; aggregateId: string; eventType: string } | undefined; + if (!outbox) { + this.logger.warn('[ACK] Message does not contain _outbox metadata'); + return null; + } + return outbox; + } +} diff --git a/backend/services/reward-service/prisma/migrations/20241204000000_init/migration.sql b/backend/services/reward-service/prisma/migrations/20241204000000_init/migration.sql new file mode 100644 index 00000000..6f34e7b3 --- /dev/null +++ b/backend/services/reward-service/prisma/migrations/20241204000000_init/migration.sql @@ -0,0 +1,117 @@ +-- CreateTable: reward_ledger_entries (聚合根1 - 行为表, append-only) +CREATE TABLE "reward_ledger_entries" ( + "entry_id" BIGSERIAL NOT NULL, + "user_id" BIGINT NOT NULL, + "source_order_id" BIGINT NOT NULL, + "source_user_id" BIGINT NOT NULL, + "right_type" VARCHAR(50) NOT NULL, + "usdt_amount" DECIMAL(20,8) NOT NULL, + "hashpower_amount" DECIMAL(20,8) NOT NULL DEFAULT 0, + "reward_status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "created_at" TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "expire_at" TIMESTAMP(3), + "claimed_at" TIMESTAMP(3), + "settled_at" TIMESTAMP(3), + "expired_at" TIMESTAMP(3), + "memo" VARCHAR(500), + + CONSTRAINT "reward_ledger_entries_pkey" PRIMARY KEY ("entry_id") +); + +-- CreateTable: reward_summaries (聚合根2 - 状态表) +CREATE TABLE "reward_summaries" ( + "summary_id" BIGSERIAL NOT NULL, + "user_id" BIGINT NOT NULL, + "pending_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "pending_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "pending_expire_at" TIMESTAMP(3), + "settleable_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settleable_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settled_total_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settled_total_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "expired_total_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "expired_total_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "last_update_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "reward_summaries_pkey" PRIMARY KEY ("summary_id") +); + +-- CreateTable: right_definitions (配置表) +CREATE TABLE "right_definitions" ( + "definition_id" BIGSERIAL NOT NULL, + "right_type" VARCHAR(50) NOT NULL, + "usdt_per_tree" DECIMAL(20,8) NOT NULL, + "hashpower_percent" DECIMAL(5,2) NOT NULL DEFAULT 0, + "payable_to" VARCHAR(50) NOT NULL, + "rule_description" TEXT, + "is_enabled" BOOLEAN NOT NULL DEFAULT true, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "right_definitions_pkey" PRIMARY KEY ("definition_id") +); + +-- CreateTable: settlement_records (行为表) +CREATE TABLE "settlement_records" ( + "settlement_id" BIGSERIAL NOT NULL, + "user_id" BIGINT NOT NULL, + "usdt_amount" DECIMAL(20,8) NOT NULL, + "hashpower_amount" DECIMAL(20,8) NOT NULL, + "settle_currency" VARCHAR(10) NOT NULL, + "received_amount" DECIMAL(20,8) NOT NULL, + "swap_tx_hash" VARCHAR(100), + "swap_rate" DECIMAL(20,8), + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "completed_at" TIMESTAMP(3), + "reward_entry_ids" BIGINT[], + + CONSTRAINT "settlement_records_pkey" PRIMARY KEY ("settlement_id") +); + +-- CreateTable: reward_events (行为表, append-only) +CREATE TABLE "reward_events" ( + "event_id" BIGSERIAL NOT NULL, + "event_type" VARCHAR(50) NOT NULL, + "aggregate_id" VARCHAR(100) NOT NULL, + "aggregate_type" VARCHAR(50) NOT NULL, + "event_data" JSONB NOT NULL, + "user_id" BIGINT, + "occurred_at" TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "version" INTEGER NOT NULL DEFAULT 1, + + CONSTRAINT "reward_events_pkey" PRIMARY KEY ("event_id") +); + +-- CreateIndex: reward_ledger_entries indexes +CREATE INDEX "idx_user_status" ON "reward_ledger_entries"("user_id", "reward_status"); +CREATE INDEX "idx_user_created" ON "reward_ledger_entries"("user_id", "created_at" DESC); +CREATE INDEX "idx_source_order" ON "reward_ledger_entries"("source_order_id"); +CREATE INDEX "idx_source_user" ON "reward_ledger_entries"("source_user_id"); +CREATE INDEX "idx_right_type" ON "reward_ledger_entries"("right_type"); +CREATE INDEX "idx_status" ON "reward_ledger_entries"("reward_status"); +CREATE INDEX "idx_expire" ON "reward_ledger_entries"("expire_at"); +CREATE INDEX "idx_created" ON "reward_ledger_entries"("created_at"); + +-- CreateIndex: reward_summaries indexes +CREATE UNIQUE INDEX "reward_summaries_user_id_key" ON "reward_summaries"("user_id"); +CREATE INDEX "idx_summary_user" ON "reward_summaries"("user_id"); +CREATE INDEX "idx_settleable_desc" ON "reward_summaries"("settleable_usdt" DESC); +CREATE INDEX "idx_pending_expire" ON "reward_summaries"("pending_expire_at"); + +-- CreateIndex: right_definitions indexes +CREATE UNIQUE INDEX "right_definitions_right_type_key" ON "right_definitions"("right_type"); +CREATE INDEX "idx_def_right_type" ON "right_definitions"("right_type"); +CREATE INDEX "idx_def_enabled" ON "right_definitions"("is_enabled"); + +-- CreateIndex: settlement_records indexes +CREATE INDEX "idx_settlement_user" ON "settlement_records"("user_id"); +CREATE INDEX "idx_settlement_status" ON "settlement_records"("status"); +CREATE INDEX "idx_settlement_created" ON "settlement_records"("created_at"); + +-- CreateIndex: reward_events indexes +CREATE INDEX "idx_reward_event_aggregate" ON "reward_events"("aggregate_type", "aggregate_id"); +CREATE INDEX "idx_reward_event_type" ON "reward_events"("event_type"); +CREATE INDEX "idx_reward_event_user" ON "reward_events"("user_id"); +CREATE INDEX "idx_reward_event_occurred" ON "reward_events"("occurred_at"); diff --git a/backend/services/reward-service/src/infrastructure/kafka/event-ack.publisher.ts b/backend/services/reward-service/src/infrastructure/kafka/event-ack.publisher.ts new file mode 100644 index 00000000..1b6e3cfa --- /dev/null +++ b/backend/services/reward-service/src/infrastructure/kafka/event-ack.publisher.ts @@ -0,0 +1,98 @@ +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId(如 orderNo) */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认发布器 + * + * B方案核心组件:消费方处理事件后发送确认 + * 发送确认消息到 planting.events.ack topic + */ +@Injectable() +export class EventAckPublisher { + private readonly logger = new Logger(EventAckPublisher.name); + private readonly serviceName = 'reward-service'; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + ) {} + + /** + * 发送处理成功确认 + */ + async sendSuccess(eventId: string, eventType: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: true, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`); + } catch (error) { + this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error); + } + } + + /** + * 发送处理失败确认 + */ + async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: false, + error: errorMessage, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('planting.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`); + } catch (error) { + this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error); + } + } + + /** + * 从消息中提取 outbox 信息 + */ + extractOutboxInfo(message: Record): { id: string; aggregateId: string; eventType: string } | null { + const outbox = message._outbox as { id: string; aggregateId: string; eventType: string } | undefined; + if (!outbox) { + this.logger.warn('[ACK] Message does not contain _outbox metadata'); + return null; + } + return outbox; + } +} diff --git a/backend/services/reward-service/src/infrastructure/kafka/event-consumer.controller.ts b/backend/services/reward-service/src/infrastructure/kafka/event-consumer.controller.ts index a8648f2d..7ff03f3f 100644 --- a/backend/services/reward-service/src/infrastructure/kafka/event-consumer.controller.ts +++ b/backend/services/reward-service/src/infrastructure/kafka/event-consumer.controller.ts @@ -1,14 +1,31 @@ import { Controller, Logger } from '@nestjs/common'; import { MessagePattern, Payload } from '@nestjs/microservices'; import { RewardApplicationService } from '../../application/services/reward-application.service'; +import { EventAckPublisher } from './event-ack.publisher'; interface PlantingOrderPaidEvent { - orderId: string; - userId: string; - treeCount: number; - provinceCode: string; - cityCode: string; - paidAt: string; + eventName?: string; + data?: { + orderId: string; + userId: string; + treeCount: number; + provinceCode: string; + cityCode: string; + paidAt: string; + }; + // 兼容旧格式 + orderId?: string; + userId?: string; + treeCount?: number; + provinceCode?: string; + cityCode?: string; + paidAt?: string; + // B方案:outbox 元数据 + _outbox?: { + id: string; + aggregateId: string; + eventType: string; + }; } @Controller() @@ -17,6 +34,7 @@ export class EventConsumerController { constructor( private readonly rewardService: RewardApplicationService, + private readonly eventAckPublisher: EventAckPublisher, ) {} /** @@ -26,22 +44,48 @@ export class EventConsumerController { async handlePlantingOrderPaid(@Payload() message: PlantingOrderPaidEvent) { this.logger.log(`Received planting.order.paid event: ${JSON.stringify(message)}`); + // 解析消息数据(支持新旧格式) + const eventData = message.data || { + orderId: message.orderId!, + userId: message.userId!, + treeCount: message.treeCount!, + provinceCode: message.provinceCode!, + cityCode: message.cityCode!, + paidAt: message.paidAt!, + }; + + // B方案:提取 outbox 信息用于发送确认 + const outboxInfo = message._outbox; + const eventId = outboxInfo?.aggregateId || eventData.orderId; + try { // 1. 计算并分配奖励 await this.rewardService.distributeRewards({ - sourceOrderId: BigInt(message.orderId), - sourceUserId: BigInt(message.userId), - treeCount: message.treeCount, - provinceCode: message.provinceCode, - cityCode: message.cityCode, + sourceOrderId: BigInt(eventData.orderId), + sourceUserId: BigInt(eventData.userId), + treeCount: eventData.treeCount, + provinceCode: eventData.provinceCode, + cityCode: eventData.cityCode, }); // 2. 检查该用户是否有待领取奖励需要转为可结算 - await this.rewardService.claimPendingRewardsForUser(BigInt(message.userId)); + await this.rewardService.claimPendingRewardsForUser(BigInt(eventData.userId)); - this.logger.log(`Successfully processed planting.order.paid for order ${message.orderId}`); + this.logger.log(`Successfully processed planting.order.paid for order ${eventData.orderId}`); + + // B方案:发送处理成功确认 + if (outboxInfo) { + await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType); + } } catch (error) { this.logger.error(`Error processing planting.order.paid:`, error); + + // B方案:发送处理失败确认 + if (outboxInfo) { + const errorMessage = error instanceof Error ? error.message : String(error); + await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage); + } + throw error; } }