diff --git a/backend/services/contribution-service/prisma/migrations/20260112200000_add_adoption_province_city/migration.sql b/backend/services/contribution-service/prisma/migrations/20260112200000_add_adoption_province_city/migration.sql new file mode 100644 index 00000000..c48b0a33 --- /dev/null +++ b/backend/services/contribution-service/prisma/migrations/20260112200000_add_adoption_province_city/migration.sql @@ -0,0 +1,7 @@ +-- AlterTable: 添加认种省市字段到 synced_adoptions 表 +-- 这些字段从 1.0 planting_orders 表的 selected_province/selected_city 同步 +ALTER TABLE "synced_adoptions" ADD COLUMN "selected_province" VARCHAR(10); +ALTER TABLE "synced_adoptions" ADD COLUMN "selected_city" VARCHAR(10); + +-- CreateIndex: 添加省市组合索引 +CREATE INDEX "synced_adoptions_selected_province_selected_city_idx" ON "synced_adoptions"("selected_province", "selected_city"); diff --git a/backend/services/contribution-service/prisma/schema.prisma b/backend/services/contribution-service/prisma/schema.prisma index 3286010a..24fdc700 100644 --- a/backend/services/contribution-service/prisma/schema.prisma +++ b/backend/services/contribution-service/prisma/schema.prisma @@ -43,6 +43,10 @@ model SyncedAdoption { adoptionDate DateTime @map("adoption_date") @db.Date status String? @db.VarChar(30) // 与1.0 planting_orders.status保持一致 + // 认种选择的省市(用于系统账户分配) + selectedProvince String? @map("selected_province") @db.VarChar(10) + selectedCity String? @map("selected_city") @db.VarChar(10) + // 贡献值计算参数(从认种时的配置) contributionPerTree Decimal @map("contribution_per_tree") @db.Decimal(20, 10) @@ -59,6 +63,7 @@ model SyncedAdoption { @@index([accountSequence]) @@index([adoptionDate]) @@index([contributionDistributed]) + @@index([selectedProvince, selectedCity]) @@map("synced_adoptions") } diff --git a/backend/services/contribution-service/src/application/application.module.ts b/backend/services/contribution-service/src/application/application.module.ts index b164a611..6df64743 100644 --- a/backend/services/contribution-service/src/application/application.module.ts +++ b/backend/services/contribution-service/src/application/application.module.ts @@ -10,6 +10,7 @@ import { CDCEventDispatcher } from './event-handlers/cdc-event-dispatcher'; // Services import { ContributionCalculationService } from './services/contribution-calculation.service'; +import { ContributionDistributionPublisherService } from './services/contribution-distribution-publisher.service'; import { SnapshotService } from './services/snapshot.service'; // Queries @@ -34,6 +35,7 @@ import { ContributionScheduler } from './schedulers/contribution.scheduler'; // Services ContributionCalculationService, + ContributionDistributionPublisherService, SnapshotService, // Queries diff --git a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts index 6d797061..8b47b16b 100644 --- a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts @@ -45,11 +45,13 @@ export class AdoptionSyncedHandler { private async handleCreate(data: any, sequenceNum: bigint): Promise { if (!data) return; - // planting_orders表字段: order_id, account_sequence, tree_count, created_at, status + // planting_orders表字段: order_id, account_sequence, tree_count, created_at, status, selected_province, selected_city const orderId = data.order_id || data.id; const accountSequence = data.account_sequence || data.accountSequence; const treeCount = data.tree_count || data.treeCount; const createdAt = data.created_at || data.createdAt || data.paid_at || data.paidAt; + const selectedProvince = data.selected_province || data.selectedProvince || null; + const selectedCity = data.selected_city || data.selectedCity || null; // 第一步:保存同步的认种订单数据 await this.syncedDataRepository.upsertSyncedAdoption({ @@ -58,6 +60,8 @@ export class AdoptionSyncedHandler { treeCount: treeCount, adoptionDate: new Date(createdAt), status: data.status ?? null, + selectedProvince: selectedProvince, + selectedCity: selectedCity, contributionPerTree: new Decimal('1'), // 每棵树1算力 sourceSequenceNum: sequenceNum, }); @@ -99,6 +103,8 @@ export class AdoptionSyncedHandler { const accountSequence = after.account_sequence || after.accountSequence; const treeCount = after.tree_count || after.treeCount; const createdAt = after.created_at || after.createdAt || after.paid_at || after.paidAt; + const selectedProvince = after.selected_province || after.selectedProvince || null; + const selectedCity = after.selected_city || after.selectedCity || null; // 第一步:保存同步的认种订单数据 await this.syncedDataRepository.upsertSyncedAdoption({ @@ -107,6 +113,8 @@ export class AdoptionSyncedHandler { treeCount: treeCount, adoptionDate: new Date(createdAt), status: after.status ?? null, + selectedProvince: selectedProvince, + selectedCity: selectedCity, contributionPerTree: new Decimal('1'), sourceSequenceNum: sequenceNum, }); diff --git a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts index b049826f..0d880182 100644 --- a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts +++ b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts @@ -104,6 +104,7 @@ export class ContributionScheduler implements OnModuleInit { /** * 每30秒发布 Outbox 中的事件 + * 使用 4 小时最大退避策略处理失败 */ @Cron('*/30 * * * * *') async publishOutboxEvents(): Promise { @@ -119,31 +120,29 @@ export class ContributionScheduler implements OnModuleInit { return; } + const successIds: bigint[] = []; + for (const event of events) { try { - await this.kafkaProducer.emit(`contribution.${event.eventType}`, { - key: event.aggregateId, - value: { - eventId: event.id, - aggregateType: event.aggregateType, - aggregateId: event.aggregateId, - eventType: event.eventType, - payload: event.payload, - createdAt: event.createdAt.toISOString(), - }, + // 使用事件中指定的 topic,而不是拼接 + await this.kafkaProducer.emit(event.topic, { + key: event.key, + value: event.payload, }); + successIds.push(event.id); } catch (error) { - this.logger.error(`Failed to publish event ${event.id}`, error); - // 继续处理下一个事件 - continue; + // 记录失败,使用退避策略重试 + const errorMessage = error instanceof Error ? error.message : String(error); + await this.outboxRepository.markAsFailed(event.id, errorMessage); + this.logger.warn(`Event ${event.id} failed, will retry with backoff: ${errorMessage}`); } } - // 标记为已处理 - const processedIds = events.map((e) => e.id); - await this.outboxRepository.markAsProcessed(processedIds); - - this.logger.debug(`Published ${processedIds.length} outbox events`); + // 标记成功发送的事件为已处理 + if (successIds.length > 0) { + await this.outboxRepository.markAsProcessed(successIds); + this.logger.debug(`Published ${successIds.length} outbox events`); + } } catch (error) { this.logger.error('Failed to publish outbox events', error); } finally { diff --git a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts index 1d2461b7..691a99df 100644 --- a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts +++ b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts @@ -9,6 +9,7 @@ import { OutboxRepository } from '../../infrastructure/persistence/repositories/ import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; import { ContributionAccountAggregate, ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate'; import { SyncedReferral } from '../../domain/repositories/synced-data.repository.interface'; +import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service'; /** * 算力计算应用服务 @@ -27,6 +28,7 @@ export class ContributionCalculationService { private readonly systemAccountRepository: SystemAccountRepository, private readonly outboxRepository: OutboxRepository, private readonly unitOfWork: UnitOfWork, + private readonly distributionPublisher: ContributionDistributionPublisherService, ) {} /** @@ -77,21 +79,16 @@ export class ContributionCalculationService { await this.updateReferrerUnlockStatus(userReferral.referrerAccountSequence); } - // 发布事件到 Outbox - await this.outboxRepository.save({ - aggregateType: 'ContributionAccount', - aggregateId: adoption.accountSequence, - eventType: 'ContributionCalculated', - payload: { - accountSequence: adoption.accountSequence, - sourceAdoptionId: originalAdoptionId.toString(), - personalContribution: result.personalRecord.amount.value.toString(), - teamLevelCount: result.teamLevelRecords.length, - teamBonusCount: result.teamBonusRecords.length, - unallocatedCount: result.unallocatedContributions.length, - calculatedAt: new Date().toISOString(), - }, - }); + // 发布分配结果到 Kafka(通过 Outbox 模式) + // 使用认种订单选择的省市代码 + const provinceCode = adoption.selectedProvince ?? 'DEFAULT'; + const cityCode = adoption.selectedCity ?? 'DEFAULT'; + await this.distributionPublisher.publishDistributionResult( + adoption, + result, + provinceCode, + cityCode, + ); }); this.logger.log( diff --git a/backend/services/contribution-service/src/application/services/contribution-distribution-publisher.service.ts b/backend/services/contribution-service/src/application/services/contribution-distribution-publisher.service.ts new file mode 100644 index 00000000..4f9d2b09 --- /dev/null +++ b/backend/services/contribution-service/src/application/services/contribution-distribution-publisher.service.ts @@ -0,0 +1,144 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; +import { ContributionDistributionResult } from '../../domain/services/contribution-calculator.service'; +import { SyncedAdoption } from '../../domain/repositories/synced-data.repository.interface'; + +/** + * 贡献值分配结果发布服务 + * 将计算完成的分配结果发布到 Kafka,供 mining-wallet-service 消费 + */ +@Injectable() +export class ContributionDistributionPublisherService { + private readonly logger = new Logger( + ContributionDistributionPublisherService.name, + ); + + constructor(private readonly outboxRepository: OutboxRepository) {} + + /** + * 发布分配结果到 Kafka + */ + async publishDistributionResult( + adoption: SyncedAdoption, + result: ContributionDistributionResult, + provinceCode: string, + cityCode: string, + ): Promise { + const eventId = `dist-${adoption.originalAdoptionId}-${Date.now()}`; + + const payload = { + eventType: 'ContributionDistributionCompleted', + eventId, + timestamp: new Date().toISOString(), + payload: { + // 认种信息 + adoptionId: adoption.originalAdoptionId.toString(), + adopterAccountSequence: adoption.accountSequence, + treeCount: adoption.treeCount, + adoptionDate: adoption.adoptionDate.toISOString(), + + // 用户贡献值分配 + userContributions: this.mapUserContributions(result), + + // 系统账户分配 + systemContributions: this.mapSystemContributions( + result, + provinceCode, + cityCode, + ), + + // 未分配(归总部) + unallocatedToHeadquarters: this.mapUnallocated(result), + }, + }; + + await this.outboxRepository.save({ + eventType: 'ContributionDistributionCompleted', + topic: 'contribution.distribution.completed', + key: adoption.accountSequence, + payload, + aggregateId: adoption.originalAdoptionId.toString(), + aggregateType: 'Adoption', + }); + + this.logger.debug( + `Published distribution result for adoption ${adoption.originalAdoptionId}`, + ); + } + + private mapUserContributions(result: ContributionDistributionResult): any[] { + const contributions: any[] = []; + + // 个人贡献 + if (result.personalRecord) { + contributions.push({ + accountSequence: result.personalRecord.accountSequence, + contributionType: 'PERSONAL', + amount: result.personalRecord.amount.value.toString(), + effectiveDate: result.personalRecord.effectiveDate.toISOString(), + expireDate: result.personalRecord.expireDate.toISOString(), + sourceAdoptionId: result.personalRecord.sourceAdoptionId.toString(), + sourceAccountSequence: result.personalRecord.sourceAccountSequence, + }); + } + + // 团队层级贡献 + for (const record of result.teamLevelRecords) { + contributions.push({ + accountSequence: record.accountSequence, + contributionType: 'TEAM_LEVEL', + amount: record.amount.value.toString(), + levelDepth: record.levelDepth, + effectiveDate: record.effectiveDate.toISOString(), + expireDate: record.expireDate.toISOString(), + sourceAdoptionId: record.sourceAdoptionId.toString(), + sourceAccountSequence: record.sourceAccountSequence, + }); + } + + // 团队奖励贡献 + for (const record of result.teamBonusRecords) { + contributions.push({ + accountSequence: record.accountSequence, + contributionType: 'TEAM_BONUS', + amount: record.amount.value.toString(), + bonusTier: record.bonusTier, + effectiveDate: record.effectiveDate.toISOString(), + expireDate: record.expireDate.toISOString(), + sourceAdoptionId: record.sourceAdoptionId.toString(), + sourceAccountSequence: record.sourceAccountSequence, + }); + } + + return contributions; + } + + private mapSystemContributions( + result: ContributionDistributionResult, + provinceCode: string, + cityCode: string, + ): any[] { + return result.systemContributions.map((sys) => ({ + accountType: sys.accountType, + amount: sys.amount.value.toString(), + provinceCode: + sys.accountType === 'PROVINCE' || sys.accountType === 'CITY' + ? provinceCode + : undefined, + cityCode: sys.accountType === 'CITY' ? cityCode : undefined, + neverExpires: sys.accountType === 'OPERATION', // 运营账户永不过期 + })); + } + + private mapUnallocated(result: ContributionDistributionResult): any[] { + return result.unallocatedContributions.map((u) => ({ + reason: u.reason, + amount: u.amount.value.toString(), + wouldBeAccountSequence: u.wouldBeAccountSequence, + levelDepth: u.levelDepth, + bonusTier: u.type.startsWith('BONUS_TIER_') + ? parseInt(u.type.split('_')[2]) + : undefined, + })); + } +} diff --git a/backend/services/contribution-service/src/domain/repositories/synced-data.repository.interface.ts b/backend/services/contribution-service/src/domain/repositories/synced-data.repository.interface.ts index f429d8db..70850326 100644 --- a/backend/services/contribution-service/src/domain/repositories/synced-data.repository.interface.ts +++ b/backend/services/contribution-service/src/domain/repositories/synced-data.repository.interface.ts @@ -26,6 +26,9 @@ export interface SyncedAdoption { treeCount: number; adoptionDate: Date; status: string | null; + // 认种选择的省市(用于系统账户分配) + selectedProvince: string | null; + selectedCity: string | null; contributionPerTree: Decimal; sourceSequenceNum: bigint; syncedAt: Date; @@ -93,6 +96,8 @@ export interface ISyncedDataRepository { treeCount: number; adoptionDate: Date; status?: string | null; + selectedProvince?: string | null; + selectedCity?: string | null; contributionPerTree: Decimal; sourceSequenceNum: bigint; }): Promise; diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/outbox.repository.ts index 6e59601c..47e489fd 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/outbox.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -24,12 +24,38 @@ export interface OutboxEvent { */ @Injectable() export class OutboxRepository { + /** + * 退避时间配置(毫秒) + * 第1次: 30s, 第2次: 1min, 第3次: 2min, 第4次: 5min, + * 第5次: 10min, 第6次: 30min, 第7次: 1h, 第8次: 2h, 第9次: 4h, 第10次: 4h + */ + private readonly BACKOFF_INTERVALS = [ + 30_000, // 30 seconds + 60_000, // 1 minute + 120_000, // 2 minutes + 300_000, // 5 minutes + 600_000, // 10 minutes + 1_800_000, // 30 minutes + 3_600_000, // 1 hour + 7_200_000, // 2 hours + 14_400_000, // 4 hours (max) + 14_400_000, // 4 hours (max) + ]; + constructor(private readonly unitOfWork: UnitOfWork) {} private get client(): TransactionClient { return this.unitOfWork.getClient(); } + /** + * 根据事件类型构建默认的 Kafka topic 名称 + * 例如: CONTRIBUTION_DISTRIBUTED -> contribution.distributed + */ + private buildDefaultTopic(eventType: string): string { + return 'contribution.' + eventType.toLowerCase().replace(/_/g, '.'); + } + async save(event: { aggregateType: string; aggregateId: string; @@ -38,7 +64,7 @@ export class OutboxRepository { topic?: string; key?: string; }): Promise { - const topic = event.topic ?? `contribution.${event.eventType.toLowerCase()}`; + const topic = event.topic ?? this.buildDefaultTopic(event.eventType); const key = event.key ?? event.aggregateId; await this.client.outboxEvent.create({ @@ -69,7 +95,7 @@ export class OutboxRepository { aggregateType: e.aggregateType, aggregateId: e.aggregateId, eventType: e.eventType, - topic: e.topic ?? `contribution.${e.eventType.toLowerCase()}`, + topic: e.topic ?? this.buildDefaultTopic(e.eventType), key: e.key ?? e.aggregateId, payload: e.payload, status: 'PENDING', @@ -79,7 +105,10 @@ export class OutboxRepository { async findUnprocessed(limit: number): Promise { const records = await this.client.outboxEvent.findMany({ - where: { status: 'PENDING' }, + where: { + status: 'PENDING', + OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: new Date() } }], + }, orderBy: { createdAt: 'asc' }, take: limit, }); @@ -101,15 +130,20 @@ export class OutboxRepository { const retryCount = event.retryCount + 1; const shouldRetry = retryCount < event.maxRetries; + // 使用预定义的退避时间表 + const backoffIndex = Math.min( + retryCount - 1, + this.BACKOFF_INTERVALS.length - 1, + ); + const delayMs = this.BACKOFF_INTERVALS[backoffIndex]; + await this.client.outboxEvent.update({ where: { id }, data: { status: shouldRetry ? 'PENDING' : 'FAILED', retryCount, lastError: error, - nextRetryAt: shouldRetry - ? new Date(Date.now() + Math.pow(2, retryCount) * 1000) // exponential backoff - : null, + nextRetryAt: shouldRetry ? new Date(Date.now() + delayMs) : null, }, }); } diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts index 84b38e89..a098cf65 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts @@ -87,6 +87,8 @@ export class SyncedDataRepository implements ISyncedDataRepository { treeCount: number; adoptionDate: Date; status?: string | null; + selectedProvince?: string | null; + selectedCity?: string | null; contributionPerTree: Decimal; sourceSequenceNum: bigint; }): Promise { @@ -98,6 +100,8 @@ export class SyncedDataRepository implements ISyncedDataRepository { treeCount: data.treeCount, adoptionDate: data.adoptionDate, status: data.status ?? null, + selectedProvince: data.selectedProvince ?? null, + selectedCity: data.selectedCity ?? null, contributionPerTree: data.contributionPerTree, sourceSequenceNum: data.sourceSequenceNum, syncedAt: new Date(), @@ -107,6 +111,8 @@ export class SyncedDataRepository implements ISyncedDataRepository { treeCount: data.treeCount, adoptionDate: data.adoptionDate, status: data.status ?? undefined, + selectedProvince: data.selectedProvince ?? undefined, + selectedCity: data.selectedCity ?? undefined, contributionPerTree: data.contributionPerTree, sourceSequenceNum: data.sourceSequenceNum, syncedAt: new Date(), @@ -372,6 +378,8 @@ export class SyncedDataRepository implements ISyncedDataRepository { treeCount: record.treeCount, adoptionDate: record.adoptionDate, status: record.status, + selectedProvince: record.selectedProvince, + selectedCity: record.selectedCity, contributionPerTree: record.contributionPerTree, sourceSequenceNum: record.sourceSequenceNum, syncedAt: record.syncedAt, diff --git a/backend/services/docs/contribution-wallet-architecture.md b/backend/services/docs/contribution-wallet-architecture.md new file mode 100644 index 00000000..d5d7c29b --- /dev/null +++ b/backend/services/docs/contribution-wallet-architecture.md @@ -0,0 +1,409 @@ +# 2.0 贡献值计算与钱包存储方案 + +## 一、系统架构总览 + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ 1.0 系统 (数据源) │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +│ │ auth-service │ │ planting-svc │ │ referral-svc │ │ +│ │ (用户注册) │ │ (认种订单) │ │ (推荐关系) │ │ +│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ +└─────────┼──────────────────┼──────────────────┼─────────────────────────────┘ + │ │ │ + │ Kafka CDC │ Kafka CDC │ Kafka CDC + ▼ ▼ ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ 2.0 系统 │ +│ │ +│ ┌────────────────────────────────────────────────────────────────────────┐ │ +│ │ contribution-service │ │ +│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ +│ │ │ CDC Consumer │ │ 贡献值计算引擎 │ │ Outbox 发布器 │ │ │ +│ │ │ (同步数据) │→ │ (分配逻辑) │→ │ (Kafka) │ │ │ +│ │ └─────────────────┘ └─────────────────┘ └────────┬────────┘ │ │ +│ └──────────────────────────────────────────────────────┼─────────────────┘ │ +│ │ │ +│ Kafka: contribution.distribution.completed │ +│ │ │ +│ ▼ │ +│ ┌────────────────────────────────────────────────────────────────────────┐ │ +│ │ mining-wallet-service │ │ +│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ +│ │ │ Kafka Consumer │ │ 钱包账户管理 │ │ 分类账记录 │ │ │ +│ │ │ (接收分配结果) │→ │ (存储贡献值) │→ │ (交易明细) │ │ │ +│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ +│ └────────────────────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌────────────────────────────────────────────────────────────────────────┐ │ +│ │ mining-admin-service │ │ +│ │ 订阅 contribution-service 和 mining-wallet-service 的事件进行数据同步 │ │ +│ └────────────────────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## 二、贡献值分配规则 + +### 2.1 分配比例 + +| 类型 | 比例 | 说明 | +|------|------|------| +| 个人贡献 | 70% | 认种人自己获得 | +| 运营账户 | 12% | 系统运营账户(永不过期) | +| 省级公司 | 1% | 认种所在省份公司 | +| 市级公司 | 2% | 认种所在城市公司 | +| 团队层级 | 7.5% | 上线1-15级,每级0.5% | +| 团队奖励 | 7.5% | 直接上线的3档奖励,每档2.5% | + +### 2.2 解锁条件 + +#### 层级解锁(每级0.5%,共15级) +| 档位 | 层级 | 解锁条件 | +|------|------|----------| +| 第1档 | L1-L5 | 直推≥1人认种 | +| 第2档 | L6-L10 | 直推≥3人认种 | +| 第3档 | L11-L15 | 直推≥5人认种 | + +#### 奖励解锁(每档2.5%,共3档) +| 档位 | 解锁条件 | +|------|----------| +| 第1档 | 自己认种 | +| 第2档 | 直推≥2人认种 | +| 第3档 | 直推≥4人认种 | + +### 2.3 有效期规则 + +- **用户贡献值**:2年有效期(从认种次日开始计算) +- **运营账户**:永不过期 +- **未分配贡献值**:归总部账户(永不过期) + +--- + +## 三、Kafka 事件设计 + +### 3.1 用户注册事件 + +**Topic**: `auth.user.registered` + +```typescript +interface UserRegisteredEvent { + eventType: 'UserRegistered'; + eventId: string; + timestamp: string; + payload: { + accountSequence: string; + phone: string; + referrerAccountSequence: string | null; + registeredAt: string; + source: 'LEGACY_MIGRATION' | 'NEW_REGISTRATION'; + }; +} +``` + +**消费者**: +- `contribution-service` - 创建 ContributionAccount +- `mining-wallet-service` - 创建用户钱包(CONTRIBUTION 类型) + +### 3.2 认种完成事件 + +**Topic**: `planting.adoption.completed` + +```typescript +interface AdoptionCompletedEvent { + eventType: 'AdoptionCompleted'; + eventId: string; + timestamp: string; + payload: { + adoptionId: string; + accountSequence: string; + treeCount: number; + contributionPerTree: string; + adoptionDate: string; + provinceCode: string; + cityCode: string; + }; +} +``` + +**消费者**: +- `contribution-service` - 触发贡献值计算 + +### 3.3 贡献值分配完成事件 + +**Topic**: `contribution.distribution.completed` + +```typescript +interface ContributionDistributionCompletedEvent { + eventType: 'ContributionDistributionCompleted'; + eventId: string; + timestamp: string; + payload: { + // 认种信息 + adoptionId: string; + adopterAccountSequence: string; + treeCount: number; + adoptionDate: string; + + // 用户贡献值分配 + userContributions: { + accountSequence: string; + contributionType: 'PERSONAL' | 'TEAM_LEVEL' | 'TEAM_BONUS'; + amount: string; + levelDepth?: number; // 1-15 for TEAM_LEVEL + bonusTier?: number; // 1-3 for TEAM_BONUS + effectiveDate: string; + expireDate: string; + sourceAdoptionId: string; + sourceAccountSequence: string; + }[]; + + // 系统账户分配 + systemContributions: { + accountType: 'OPERATION' | 'PROVINCE' | 'CITY' | 'HEADQUARTERS'; + amount: string; + provinceCode?: string; + cityCode?: string; + neverExpires: boolean; + }[]; + + // 未分配(归总部) + unallocatedToHeadquarters: { + reason: string; + amount: string; + wouldBeAccountSequence?: string; + levelDepth?: number; + bonusTier?: number; + }[]; + }; +} +``` + +**消费者**: +- `mining-wallet-service` - 存储贡献值到钱包 + +### 3.4 贡献值入账事件 + +**Topic**: `mining-wallet.contribution.credited` + +```typescript +interface ContributionCreditedEvent { + eventType: 'ContributionCredited'; + eventId: string; + timestamp: string; + payload: { + accountSequence: string; + walletType: 'CONTRIBUTION'; + amount: string; + balanceAfter: string; + transactionId: string; + sourceType: 'ADOPTION_DISTRIBUTION'; + referenceId: string; + }; +} +``` + +**消费者**: +- `mining-admin-service` - 同步数据用于展示 + +--- + +## 四、Outbox 模式 + +### 4.1 Outbox 表结构 + +```prisma +model OutboxEvent { + id BigInt @id @default(autoincrement()) @map("outbox_id") + + eventType String @map("event_type") @db.VarChar(100) + topic String @map("topic") @db.VarChar(100) + key String @map("key") @db.VarChar(200) + payload Json @map("payload") + + aggregateId String @map("aggregate_id") @db.VarChar(100) + aggregateType String @map("aggregate_type") @db.VarChar(50) + + status String @default("PENDING") @map("status") @db.VarChar(20) + retryCount Int @default(0) @map("retry_count") + maxRetries Int @default(10) @map("max_retries") + lastError String? @map("last_error") @db.Text + + createdAt DateTime @default(now()) @map("created_at") + publishedAt DateTime? @map("published_at") + nextRetryAt DateTime? @map("next_retry_at") + + @@index([status, createdAt]) + @@index([status, nextRetryAt]) + @@map("outbox_events") +} +``` + +### 4.2 退避策略(最大4小时) + +| 重试次数 | 等待时间 | 累计时间 | +|---------|---------|---------| +| 1 | 30秒 | 30秒 | +| 2 | 1分钟 | 1.5分钟 | +| 3 | 2分钟 | 3.5分钟 | +| 4 | 5分钟 | 8.5分钟 | +| 5 | 10分钟 | 18.5分钟 | +| 6 | 30分钟 | 48.5分钟 | +| 7 | 1小时 | 1小时48分 | +| 8 | 2小时 | 3小时48分 | +| 9 | 4小时 | 7小时48分 | +| 10 | 4小时(max) | 11小时48分 | + +```typescript +const BACKOFF_INTERVALS = [ + 30_000, // 30 seconds + 60_000, // 1 minute + 120_000, // 2 minutes + 300_000, // 5 minutes + 600_000, // 10 minutes + 1_800_000, // 30 minutes + 3_600_000, // 1 hour + 7_200_000, // 2 hours + 14_400_000, // 4 hours (max) + 14_400_000, // 4 hours (max) +]; +``` + +### 4.3 幂等性保证(4小时去重窗口) + +```prisma +model ProcessedEvent { + id String @id @default(uuid()) + eventId String @unique @map("event_id") + eventType String @map("event_type") + sourceService String @map("source_service") + processedAt DateTime @default(now()) @map("processed_at") + + @@index([sourceService]) + @@index([processedAt]) + @@map("processed_events") +} +``` + +- **Redis 缓存**:4小时 TTL,快速路径检查 +- **DB 持久化**:ProcessedEvent 表,24小时后清理 +- **双重检查**:先查 Redis,未命中再查 DB + +--- + +## 五、服务职责划分 + +| 服务 | 职责 | 数据存储 | +|------|------|----------| +| **auth-service** | 用户注册/迁移,发送 UserRegistered 事件 | 用户基础信息 | +| **planting-service** | 认种订单,发送 AdoptionCompleted 事件 | 认种订单 | +| **contribution-service** | 贡献值计算逻辑,解锁状态管理 | 计算明细、解锁事件(用于审计) | +| **mining-wallet-service** | 贡献值存储,余额管理,过期处理 | 钱包余额、交易明细(分类账) | +| **mining-admin-service** | 数据聚合展示 | 同步缓存 | + +--- + +## 六、数据模型 + +### 6.1 contribution-service(已实现) + +- `ContributionAccount` - 用户贡献值账户(汇总) +- `ContributionRecord` - 贡献值明细(审计) +- `UnlockEvent` - 解锁事件记录 +- `UnallocatedContribution` - 未分配贡献值 +- `SystemAccount` - 系统账户(运营/省/市/总部) + +### 6.2 mining-wallet-service(需扩展) + +现有模型: +- `UserWallet` - 用户钱包(CONTRIBUTION 类型) +- `UserWalletTransaction` - 交易明细(分类账) +- `SystemAccount` - 系统账户 + +需要添加: +- SystemAccount 添加 `contributionBalance` 字段 +- TransactionType 添加 `CONTRIBUTION_CREDIT`、`CONTRIBUTION_EXPIRE` 枚举值 + +--- + +## 七、实施步骤 + +### Phase 1: mining-wallet-service 扩展 +1. 修改 schema 添加 SystemAccount 贡献值字段 +2. 实现 ContributionWalletService +3. 实现 ContributionDistributionConsumer +4. 实现 UserRegisteredConsumer(创建用户钱包) +5. 实现 ContributionExpiryScheduler +6. 实现 Outbox Scheduler(4小时退避) + +### Phase 2: contribution-service 扩展 +1. 实现 ContributionDistributionPublisherService +2. 修改 ContributionCalculationService 调用发布器 +3. 测试完整的分配流程 + +### Phase 3: auth-service 集成 +1. 确保 UserRegistered 事件正确发送 +2. 包含 referrerAccountSequence 信息 + +### Phase 4: 历史数据迁移 +1. 批量处理1.0的历史认种数据 +2. 逐笔计算并发送分配事件 +3. 验证钱包余额正确性 + +--- + +## 八、关键流程 + +### 8.1 认种触发贡献值分配 + +``` +1. planting-service 发送 AdoptionCompleted 事件 + ↓ +2. contribution-service 消费事件 + ├─ 查询认种人的上线链条(最多15级) + ├─ 查询各上线的解锁状态 + ├─ 计算贡献值分配 + └─ 写入 Outbox 表 + ↓ +3. Outbox Scheduler 发送到 Kafka + ↓ +4. mining-wallet-service 消费 ContributionDistributionCompleted + ├─ 幂等性检查 + ├─ 更新用户钱包余额 + ├─ 更新系统账户余额 + └─ 记录交易明细 + ↓ +5. mining-admin-service 消费 ContributionCredited + └─ 同步缓存数据 +``` + +### 8.2 贡献值过期处理 + +``` +1. 每日凌晨1点定时任务启动 + ↓ +2. 查询 expireDate <= today 的交易记录 + ↓ +3. 逐笔处理过期 + ├─ 扣减钱包余额 + ├─ 创建过期交易记录 + └─ 标记原交易为已过期 +``` + +--- + +## 九、监控与告警 + +### 9.1 关键指标 + +- Outbox 积压数量 +- 事件处理延迟 +- 重试次数分布 +- 失败事件数量 + +### 9.2 告警规则 + +- Outbox 积压 > 1000 条 +- 事件处理延迟 > 5 分钟 +- 失败事件数量 > 10 条/小时 diff --git a/backend/services/mining-wallet-service/prisma/migrations/20260112180000_add_contribution_balance/migration.sql b/backend/services/mining-wallet-service/prisma/migrations/20260112180000_add_contribution_balance/migration.sql new file mode 100644 index 00000000..3ffea7c6 --- /dev/null +++ b/backend/services/mining-wallet-service/prisma/migrations/20260112180000_add_contribution_balance/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable: 添加贡献值余额字段到系统账户 +ALTER TABLE "system_accounts" ADD COLUMN "contribution_balance" DECIMAL(30,8) NOT NULL DEFAULT 0; diff --git a/backend/services/mining-wallet-service/prisma/schema.prisma b/backend/services/mining-wallet-service/prisma/schema.prisma index 5a1c75dc..f3899c5d 100644 --- a/backend/services/mining-wallet-service/prisma/schema.prisma +++ b/backend/services/mining-wallet-service/prisma/schema.prisma @@ -187,6 +187,7 @@ model SystemAccount { shareBalance Decimal @default(0) @map("share_balance") @db.Decimal(30, 8) usdtBalance Decimal @default(0) @map("usdt_balance") @db.Decimal(30, 8) greenPointBalance Decimal @default(0) @map("green_point_balance") @db.Decimal(30, 8) + contributionBalance Decimal @default(0) @map("contribution_balance") @db.Decimal(30, 8) frozenShare Decimal @default(0) @map("frozen_share") @db.Decimal(30, 8) frozenUsdt Decimal @default(0) @map("frozen_usdt") @db.Decimal(30, 8) diff --git a/backend/services/mining-wallet-service/src/application/application.module.ts b/backend/services/mining-wallet-service/src/application/application.module.ts index d09a163b..1b74fcd0 100644 --- a/backend/services/mining-wallet-service/src/application/application.module.ts +++ b/backend/services/mining-wallet-service/src/application/application.module.ts @@ -6,9 +6,11 @@ import { SystemAccountService } from './services/system-account.service'; import { PoolAccountService } from './services/pool-account.service'; import { UserWalletService } from './services/user-wallet.service'; import { BlockchainIntegrationService } from './services/blockchain.service'; +import { ContributionWalletService } from './services/contribution-wallet.service'; // Schedulers import { OutboxScheduler } from './schedulers/outbox.scheduler'; +import { ContributionExpiryScheduler } from './schedulers/contribution-expiry.scheduler'; @Module({ imports: [ScheduleModule.forRoot()], @@ -18,14 +20,17 @@ import { OutboxScheduler } from './schedulers/outbox.scheduler'; PoolAccountService, UserWalletService, BlockchainIntegrationService, + ContributionWalletService, // Schedulers OutboxScheduler, + ContributionExpiryScheduler, ], exports: [ SystemAccountService, PoolAccountService, UserWalletService, BlockchainIntegrationService, + ContributionWalletService, ], }) export class ApplicationModule {} diff --git a/backend/services/mining-wallet-service/src/application/schedulers/contribution-expiry.scheduler.ts b/backend/services/mining-wallet-service/src/application/schedulers/contribution-expiry.scheduler.ts new file mode 100644 index 00000000..2eef5955 --- /dev/null +++ b/backend/services/mining-wallet-service/src/application/schedulers/contribution-expiry.scheduler.ts @@ -0,0 +1,131 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; +import { RedisService } from '../../infrastructure/redis/redis.service'; +import { ContributionWalletService } from '../services/contribution-wallet.service'; +import { UserWalletType, AssetType } from '@prisma/client'; +import Decimal from 'decimal.js'; + +@Injectable() +export class ContributionExpiryScheduler { + private readonly logger = new Logger(ContributionExpiryScheduler.name); + private readonly LOCK_KEY = 'mining-wallet:contribution-expiry:lock'; + private readonly LOCK_TTL = 3600; // 1 hour + + constructor( + private readonly prisma: PrismaService, + private readonly redis: RedisService, + private readonly contributionWalletService: ContributionWalletService, + ) {} + + /** + * 每日凌晨1点执行过期检查 + */ + @Cron('0 1 * * *') + async processExpiredContributions(): Promise { + const lockValue = await this.redis.acquireLock(this.LOCK_KEY, this.LOCK_TTL); + if (!lockValue) { + this.logger.debug('Another instance is processing expired contributions'); + return; + } + + try { + this.logger.log('Starting contribution expiry processing'); + + const today = new Date(); + today.setHours(0, 0, 0, 0); + + // 查找所有过期的贡献值交易记录 + // 通过 metadata.expireDate 字段判断 + const expiredTransactions = await this.prisma.userWalletTransaction.findMany({ + where: { + walletType: UserWalletType.CONTRIBUTION, + assetType: AssetType.CONTRIBUTION, + amount: { gt: 0 }, // 只查找入账记录 + // 检查 metadata 中的 expireDate + // 注意: Prisma 的 JSON 过滤有限制,可能需要原生查询 + }, + take: 1000, // 每次处理1000条 + orderBy: { createdAt: 'asc' }, + }); + + let expiredCount = 0; + + for (const tx of expiredTransactions) { + const metadata = tx.metadata as any; + if (!metadata?.expireDate || metadata?.isExpired) { + continue; + } + + const expireDate = new Date(metadata.expireDate); + if (expireDate > today) { + continue; + } + + try { + await this.contributionWalletService.expireContribution( + tx.id, + tx.accountSequence, + new Decimal(tx.amount.toString()), + ); + + // 标记原交易为已过期 + await this.prisma.userWalletTransaction.update({ + where: { id: tx.id }, + data: { + metadata: { + ...metadata, + isExpired: true, + expiredAt: new Date().toISOString(), + }, + }, + }); + + expiredCount++; + } catch (error) { + this.logger.error( + `Failed to expire contribution ${tx.id}`, + error instanceof Error ? error.stack : error, + ); + } + } + + this.logger.log(`Processed ${expiredCount} expired contributions`); + } finally { + await this.redis.releaseLock(this.LOCK_KEY, lockValue); + } + } + + /** + * 每天凌晨3点清理过期的 ProcessedEvent 记录(保留24小时) + */ + @Cron('0 3 * * *') + async cleanupProcessedEvents(): Promise { + const lockValue = await this.redis.acquireLock( + `${this.LOCK_KEY}:cleanup`, + 60, + ); + if (!lockValue) { + return; + } + + try { + const cutoffDate = new Date(); + cutoffDate.setHours(cutoffDate.getHours() - 24); + + const result = await this.prisma.processedEvent.deleteMany({ + where: { + processedAt: { lt: cutoffDate }, + }, + }); + + if (result.count > 0) { + this.logger.log( + `Cleaned up ${result.count} processed events older than 24 hours`, + ); + } + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue); + } + } +} diff --git a/backend/services/mining-wallet-service/src/application/services/contribution-wallet.service.ts b/backend/services/mining-wallet-service/src/application/services/contribution-wallet.service.ts new file mode 100644 index 00000000..2f4394a5 --- /dev/null +++ b/backend/services/mining-wallet-service/src/application/services/contribution-wallet.service.ts @@ -0,0 +1,284 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; +import { UserWalletType, AssetType, TransactionType } from '@prisma/client'; +import Decimal from 'decimal.js'; + +export interface CreditContributionInput { + accountSequence: string; + amount: Decimal; + contributionType: 'PERSONAL' | 'TEAM_LEVEL' | 'TEAM_BONUS'; + levelDepth?: number; + bonusTier?: number; + effectiveDate: Date; + expireDate: Date; + sourceAdoptionId: string; + sourceAccountSequence: string; +} + +export interface CreditSystemContributionInput { + accountType: 'OPERATION' | 'PROVINCE' | 'CITY' | 'HEADQUARTERS'; + amount: Decimal; + provinceCode?: string; + cityCode?: string; + neverExpires: boolean; + sourceAdoptionId: string; + sourceAccountSequence: string; + memo?: string; +} + +@Injectable() +export class ContributionWalletService { + private readonly logger = new Logger(ContributionWalletService.name); + + constructor( + private readonly prisma: PrismaService, + private readonly outboxRepo: OutboxRepository, + ) {} + + /** + * 为用户钱包增加贡献值 + */ + async creditContribution(input: CreditContributionInput): Promise { + await this.prisma.$transaction(async (tx) => { + // 1. 获取或创建用户贡献值钱包 + let wallet = await tx.userWallet.findUnique({ + where: { + accountSequence_walletType: { + accountSequence: input.accountSequence, + walletType: UserWalletType.CONTRIBUTION, + }, + }, + }); + + if (!wallet) { + wallet = await tx.userWallet.create({ + data: { + accountSequence: input.accountSequence, + walletType: UserWalletType.CONTRIBUTION, + balance: new Decimal(0), + frozenBalance: new Decimal(0), + }, + }); + } + + const balanceBefore = new Decimal(wallet.balance.toString()); + const balanceAfter = balanceBefore.plus(input.amount); + + // 2. 更新钱包余额 + await tx.userWallet.update({ + where: { id: wallet.id }, + data: { + balance: balanceAfter, + totalInflow: { increment: input.amount }, + }, + }); + + // 3. 创建交易记录(分类账) + const transaction = await tx.userWalletTransaction.create({ + data: { + userWalletId: wallet.id, + accountSequence: input.accountSequence, + walletType: UserWalletType.CONTRIBUTION, + transactionType: TransactionType.TRANSFER_IN, + assetType: AssetType.CONTRIBUTION, + amount: input.amount, + balanceBefore, + balanceAfter, + counterpartyType: 'EXTERNAL', + referenceId: input.sourceAdoptionId, + referenceType: 'ADOPTION', + memo: this.buildMemo(input), + metadata: { + contributionType: input.contributionType, + levelDepth: input.levelDepth, + bonusTier: input.bonusTier, + effectiveDate: input.effectiveDate.toISOString(), + expireDate: input.expireDate.toISOString(), + sourceAccountSequence: input.sourceAccountSequence, + }, + }, + }); + + // 4. 发布事件到 Outbox + await tx.outboxEvent.create({ + data: { + aggregateType: 'UserWallet', + aggregateId: wallet.id, + eventType: 'CONTRIBUTION_CREDITED', + topic: 'mining-wallet.contribution.credited', + key: input.accountSequence, + payload: { + accountSequence: input.accountSequence, + walletType: 'CONTRIBUTION', + amount: input.amount.toString(), + balanceAfter: balanceAfter.toString(), + transactionId: transaction.id, + contributionType: input.contributionType, + sourceAdoptionId: input.sourceAdoptionId, + sourceAccountSequence: input.sourceAccountSequence, + levelDepth: input.levelDepth, + bonusTier: input.bonusTier, + }, + }, + }); + + this.logger.debug( + `Credited ${input.amount} contribution to ${input.accountSequence}, type: ${input.contributionType}`, + ); + }); + } + + /** + * 为系统账户增加贡献值 + */ + async creditSystemContribution( + input: CreditSystemContributionInput, + ): Promise { + await this.prisma.$transaction(async (tx) => { + // 1. 根据 accountType 和区域查找系统账户 + let whereClause: any = { accountType: input.accountType }; + + if (input.accountType === 'PROVINCE' && input.provinceCode) { + whereClause = { + accountType: input.accountType, + province: { code: input.provinceCode }, + }; + } else if (input.accountType === 'CITY' && input.cityCode) { + whereClause = { + accountType: input.accountType, + city: { code: input.cityCode }, + }; + } + + const systemAccount = await tx.systemAccount.findFirst({ + where: whereClause, + }); + + if (!systemAccount) { + this.logger.warn( + `System account not found: ${input.accountType}, province: ${input.provinceCode}, city: ${input.cityCode}`, + ); + return; + } + + const balanceBefore = new Decimal( + systemAccount.contributionBalance?.toString() || '0', + ); + const balanceAfter = balanceBefore.plus(input.amount); + + // 2. 更新系统账户贡献值余额 + await tx.systemAccount.update({ + where: { id: systemAccount.id }, + data: { + contributionBalance: balanceAfter, + }, + }); + + // 3. 创建系统账户交易记录 + await tx.systemAccountTransaction.create({ + data: { + systemAccountId: systemAccount.id, + transactionType: TransactionType.TRANSFER_IN, + assetType: AssetType.CONTRIBUTION, + amount: input.amount, + balanceBefore, + balanceAfter, + counterpartyType: 'USER', + counterpartyAccountSeq: input.sourceAccountSequence, + referenceId: input.sourceAdoptionId, + referenceType: 'ADOPTION', + memo: + input.memo || + `贡献值分配, 来源认种: ${input.sourceAdoptionId}, 认种人: ${input.sourceAccountSequence}`, + metadata: { + neverExpires: input.neverExpires, + }, + }, + }); + + this.logger.debug( + `Credited ${input.amount} contribution to system account ${systemAccount.code}`, + ); + }); + } + + /** + * 处理贡献值过期 + */ + async expireContribution( + transactionId: string, + accountSequence: string, + amount: Decimal, + ): Promise { + await this.prisma.$transaction(async (tx) => { + const wallet = await tx.userWallet.findUnique({ + where: { + accountSequence_walletType: { + accountSequence, + walletType: UserWalletType.CONTRIBUTION, + }, + }, + }); + + if (!wallet) { + this.logger.warn( + `Wallet not found for expiry: ${accountSequence}`, + ); + return; + } + + const balanceBefore = new Decimal(wallet.balance.toString()); + let balanceAfter = balanceBefore.minus(amount); + + // 确保余额不为负 + if (balanceAfter.lt(0)) { + balanceAfter = new Decimal(0); + } + + // 更新钱包余额 + await tx.userWallet.update({ + where: { id: wallet.id }, + data: { + balance: balanceAfter, + totalOutflow: { increment: amount }, + }, + }); + + // 创建过期交易记录 + await tx.userWalletTransaction.create({ + data: { + userWalletId: wallet.id, + accountSequence, + walletType: UserWalletType.CONTRIBUTION, + transactionType: TransactionType.TRANSFER_OUT, + assetType: AssetType.CONTRIBUTION, + amount: amount.negated(), + balanceBefore, + balanceAfter, + counterpartyType: 'SYSTEM_ACCOUNT', + referenceId: transactionId, + referenceType: 'EXPIRY', + memo: `贡献值过期, 原交易ID: ${transactionId}`, + metadata: { + originalTransactionId: transactionId, + expiredAt: new Date().toISOString(), + }, + }, + }); + + this.logger.debug( + `Expired ${amount} contribution for ${accountSequence}`, + ); + }); + } + + private buildMemo(input: CreditContributionInput): string { + const typeMap: Record = { + PERSONAL: '个人认种贡献', + TEAM_LEVEL: `团队层级贡献(第${input.levelDepth}级)`, + TEAM_BONUS: `团队奖励贡献(第${input.bonusTier}档)`, + }; + return `${typeMap[input.contributionType]}, 来源认种: ${input.sourceAdoptionId}, 认种人: ${input.sourceAccountSequence}`; + } +} diff --git a/backend/services/mining-wallet-service/src/infrastructure/infrastructure.module.ts b/backend/services/mining-wallet-service/src/infrastructure/infrastructure.module.ts index 47777f50..f813a745 100644 --- a/backend/services/mining-wallet-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mining-wallet-service/src/infrastructure/infrastructure.module.ts @@ -8,9 +8,12 @@ import { UserWalletRepository } from './persistence/repositories/user-wallet.rep import { RegionRepository } from './persistence/repositories/region.repository'; import { BlockchainRepository } from './persistence/repositories/blockchain.repository'; import { OutboxRepository } from './persistence/repositories/outbox.repository'; +import { ProcessedEventRepository } from './persistence/repositories/processed-event.repository'; import { RedisService } from './redis/redis.service'; import { KafkaProducerService } from './kafka/kafka-producer.service'; import { KavaBlockchainService } from './blockchain/kava-blockchain.service'; +import { ContributionDistributionConsumer } from './kafka/consumers/contribution-distribution.consumer'; +import { UserRegisteredConsumer } from './kafka/consumers/user-registered.consumer'; @Global() @Module({ @@ -32,6 +35,9 @@ import { KavaBlockchainService } from './blockchain/kava-blockchain.service'; producer: { allowAutoTopicCreation: true, }, + consumer: { + groupId: 'mining-wallet-service-group', + }, }, }), inject: [ConfigService], @@ -46,9 +52,13 @@ import { KavaBlockchainService } from './blockchain/kava-blockchain.service'; RegionRepository, BlockchainRepository, OutboxRepository, + ProcessedEventRepository, // Services KafkaProducerService, KavaBlockchainService, + // Consumers + ContributionDistributionConsumer, + UserRegisteredConsumer, { provide: 'REDIS_OPTIONS', useFactory: (configService: ConfigService) => ({ @@ -69,6 +79,7 @@ import { KavaBlockchainService } from './blockchain/kava-blockchain.service'; RegionRepository, BlockchainRepository, OutboxRepository, + ProcessedEventRepository, // Services KafkaProducerService, KavaBlockchainService, diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts new file mode 100644 index 00000000..e77b6964 --- /dev/null +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts @@ -0,0 +1,156 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { EventPattern, Payload } from '@nestjs/microservices'; +import Decimal from 'decimal.js'; +import { PrismaService } from '../../persistence/prisma/prisma.service'; +import { RedisService } from '../../redis/redis.service'; +import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository'; +import { ContributionWalletService } from '../../../application/services/contribution-wallet.service'; +import { SystemAccountService } from '../../../application/services/system-account.service'; +import { + ContributionDistributionCompletedEvent, + ContributionDistributionPayload, +} from '../events/contribution-distribution.event'; + +// 4小时 TTL(秒) +const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; + +@Injectable() +export class ContributionDistributionConsumer implements OnModuleInit { + private readonly logger = new Logger(ContributionDistributionConsumer.name); + + constructor( + private readonly prisma: PrismaService, + private readonly redis: RedisService, + private readonly processedEventRepo: ProcessedEventRepository, + private readonly contributionWalletService: ContributionWalletService, + private readonly systemAccountService: SystemAccountService, + ) {} + + async onModuleInit() { + this.logger.log('ContributionDistributionConsumer initialized'); + } + + @EventPattern('contribution.distribution.completed') + async handleDistributionCompleted( + @Payload() message: any, + ): Promise { + // 解析消息格式 + const event: ContributionDistributionCompletedEvent = + message.value || message; + const eventId = event.eventId || message.eventId; + + if (!eventId) { + this.logger.warn('Received event without eventId, skipping'); + return; + } + + this.logger.debug(`Processing distribution event: ${eventId}`); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + await this.processDistribution(event.payload); + + // 标记为已处理 + await this.markEventProcessed(eventId, event.eventType); + + this.logger.log( + `Distribution for adoption ${event.payload.adoptionId} processed successfully`, + ); + } catch (error) { + this.logger.error( + `Failed to process distribution for adoption ${event.payload.adoptionId}`, + error instanceof Error ? error.stack : error, + ); + throw error; // 让 Kafka 重试 + } + } + + private async processDistribution( + payload: ContributionDistributionPayload, + ): Promise { + // 1. 处理用户贡献值 + for (const userContrib of payload.userContributions) { + await this.contributionWalletService.creditContribution({ + accountSequence: userContrib.accountSequence, + amount: new Decimal(userContrib.amount), + contributionType: userContrib.contributionType, + levelDepth: userContrib.levelDepth, + bonusTier: userContrib.bonusTier, + effectiveDate: new Date(userContrib.effectiveDate), + expireDate: new Date(userContrib.expireDate), + sourceAdoptionId: userContrib.sourceAdoptionId, + sourceAccountSequence: userContrib.sourceAccountSequence, + }); + } + + // 2. 处理系统账户贡献值 + for (const sysContrib of payload.systemContributions) { + await this.contributionWalletService.creditSystemContribution({ + accountType: sysContrib.accountType, + amount: new Decimal(sysContrib.amount), + provinceCode: sysContrib.provinceCode, + cityCode: sysContrib.cityCode, + neverExpires: sysContrib.neverExpires, + sourceAdoptionId: payload.adoptionId, + sourceAccountSequence: payload.adopterAccountSequence, + }); + } + + // 3. 处理未分配的贡献值(归总部) + for (const unalloc of payload.unallocatedToHeadquarters) { + await this.contributionWalletService.creditSystemContribution({ + accountType: 'HEADQUARTERS', + amount: new Decimal(unalloc.amount), + neverExpires: true, + sourceAdoptionId: payload.adoptionId, + sourceAccountSequence: payload.adopterAccountSequence, + memo: unalloc.reason, + }); + } + } + + /** + * 幂等性检查 - Redis + DB 双重检查,4小时去重窗口 + */ + private async isEventProcessed(eventId: string): Promise { + const redisKey = `processed-event:${eventId}`; + + // 1. 先检查 Redis 缓存(快速路径) + const cached = await this.redis.get(redisKey); + if (cached) return true; + + // 2. 检查数据库 + const dbRecord = await this.processedEventRepo.findByEventId(eventId); + if (dbRecord) { + // 回填 Redis 缓存 + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + return true; + } + + return false; + } + + /** + * 标记事件为已处理 + */ + private async markEventProcessed( + eventId: string, + eventType: string, + ): Promise { + // 1. 写入数据库 + await this.processedEventRepo.create({ + eventId, + eventType, + sourceService: 'contribution-service', + }); + + // 2. 写入 Redis 缓存(4小时 TTL) + const redisKey = `processed-event:${eventId}`; + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + } +} diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts new file mode 100644 index 00000000..4f2ea634 --- /dev/null +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts @@ -0,0 +1,118 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { EventPattern, Payload } from '@nestjs/microservices'; +import { RedisService } from '../../redis/redis.service'; +import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository'; +import { UserWalletService } from '../../../application/services/user-wallet.service'; +import { UserRegisteredEvent } from '../events/contribution-distribution.event'; + +// 4小时 TTL(秒) +const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; + +@Injectable() +export class UserRegisteredConsumer implements OnModuleInit { + private readonly logger = new Logger(UserRegisteredConsumer.name); + + constructor( + private readonly redis: RedisService, + private readonly processedEventRepo: ProcessedEventRepository, + private readonly userWalletService: UserWalletService, + ) {} + + async onModuleInit() { + this.logger.log('UserRegisteredConsumer initialized'); + } + + @EventPattern('auth.user.registered') + async handleUserRegistered(@Payload() message: any): Promise { + // 解析消息格式 + const event: UserRegisteredEvent = message.value || message; + const eventId = event.eventId || message.eventId; + + if (!eventId) { + this.logger.warn('Received event without eventId, skipping'); + return; + } + + const { accountSequence } = event.payload; + + this.logger.debug( + `Processing user registered event: ${eventId}, accountSequence: ${accountSequence}`, + ); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + // 为用户创建所有类型的钱包 + await this.userWalletService.createWalletsForUser(accountSequence); + + // 标记为已处理 + await this.markEventProcessed(eventId, event.eventType); + + this.logger.log( + `Wallets created for user ${accountSequence}, source: ${event.payload.source}`, + ); + } catch (error) { + // 如果是重复创建钱包的错误,忽略 + if ( + error instanceof Error && + error.message.includes('Unique constraint') + ) { + this.logger.debug( + `Wallets already exist for user ${accountSequence}, marking as processed`, + ); + await this.markEventProcessed(eventId, event.eventType); + return; + } + + this.logger.error( + `Failed to create wallets for user ${accountSequence}`, + error instanceof Error ? error.stack : error, + ); + throw error; // 让 Kafka 重试 + } + } + + /** + * 幂等性检查 - Redis + DB 双重检查,4小时去重窗口 + */ + private async isEventProcessed(eventId: string): Promise { + const redisKey = `processed-event:${eventId}`; + + // 1. 先检查 Redis 缓存(快速路径) + const cached = await this.redis.get(redisKey); + if (cached) return true; + + // 2. 检查数据库 + const dbRecord = await this.processedEventRepo.findByEventId(eventId); + if (dbRecord) { + // 回填 Redis 缓存 + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + return true; + } + + return false; + } + + /** + * 标记事件为已处理 + */ + private async markEventProcessed( + eventId: string, + eventType: string, + ): Promise { + // 1. 写入数据库 + await this.processedEventRepo.create({ + eventId, + eventType, + sourceService: 'auth-service', + }); + + // 2. 写入 Redis 缓存(4小时 TTL) + const redisKey = `processed-event:${eventId}`; + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + } +} diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/events/contribution-distribution.event.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/events/contribution-distribution.event.ts new file mode 100644 index 00000000..03059a60 --- /dev/null +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/events/contribution-distribution.event.ts @@ -0,0 +1,72 @@ +/** + * 贡献值分配完成事件 + * 来自 contribution-service + */ +export interface ContributionDistributionCompletedEvent { + eventType: 'ContributionDistributionCompleted'; + eventId: string; + timestamp: string; + payload: ContributionDistributionPayload; +} + +export interface ContributionDistributionPayload { + // 认种信息 + adoptionId: string; + adopterAccountSequence: string; + treeCount: number; + adoptionDate: string; + + // 用户贡献值分配 + userContributions: UserContributionItem[]; + + // 系统账户分配 + systemContributions: SystemContributionItem[]; + + // 未分配(归总部) + unallocatedToHeadquarters: UnallocatedContributionItem[]; +} + +export interface UserContributionItem { + accountSequence: string; + contributionType: 'PERSONAL' | 'TEAM_LEVEL' | 'TEAM_BONUS'; + amount: string; + levelDepth?: number; // 1-15 for TEAM_LEVEL + bonusTier?: number; // 1-3 for TEAM_BONUS + effectiveDate: string; + expireDate: string; + sourceAdoptionId: string; + sourceAccountSequence: string; +} + +export interface SystemContributionItem { + accountType: 'OPERATION' | 'PROVINCE' | 'CITY' | 'HEADQUARTERS'; + amount: string; + provinceCode?: string; + cityCode?: string; + neverExpires: boolean; +} + +export interface UnallocatedContributionItem { + reason: string; + amount: string; + wouldBeAccountSequence?: string; + levelDepth?: number; + bonusTier?: number; +} + +/** + * 用户注册事件 + * 来自 auth-service + */ +export interface UserRegisteredEvent { + eventType: 'UserRegistered'; + eventId: string; + timestamp: string; + payload: { + accountSequence: string; + phone: string; + referrerAccountSequence: string | null; + registeredAt: string; + source: 'LEGACY_MIGRATION' | 'NEW_REGISTRATION'; + }; +} diff --git a/backend/services/mining-wallet-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/mining-wallet-service/src/infrastructure/persistence/repositories/outbox.repository.ts index 31d8be96..81a99736 100644 --- a/backend/services/mining-wallet-service/src/infrastructure/persistence/repositories/outbox.repository.ts +++ b/backend/services/mining-wallet-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -4,6 +4,24 @@ import { OutboxEvent, OutboxStatus } from '@prisma/client'; @Injectable() export class OutboxRepository { + /** + * 退避时间配置(毫秒) + * 第1次: 30s, 第2次: 1min, 第3次: 2min, 第4次: 5min, + * 第5次: 10min, 第6次: 30min, 第7次: 1h, 第8次: 2h, 第9次: 4h, 第10次: 4h + */ + private readonly BACKOFF_INTERVALS = [ + 30_000, // 30 seconds + 60_000, // 1 minute + 120_000, // 2 minutes + 300_000, // 5 minutes + 600_000, // 10 minutes + 1_800_000, // 30 minutes + 3_600_000, // 1 hour + 7_200_000, // 2 hours + 14_400_000, // 4 hours (max) + 14_400_000, // 4 hours (max) + ]; + constructor(private readonly prisma: PrismaService) {} /** @@ -58,7 +76,7 @@ export class OutboxRepository { } /** - * 标记事件发布失败,计算下次重试时间(指数退避,最大3小时) + * 标记事件发布失败,计算下次重试时间(指数退避,最大4小时) */ async markAsFailed( id: string, @@ -69,13 +87,12 @@ export class OutboxRepository { const newRetryCount = currentRetryCount + 1; const shouldFail = newRetryCount >= maxRetries; - // 指数退避: 30s, 60s, 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 10800s (最大3小时) - const baseDelayMs = 30000; // 30 seconds - const maxDelayMs = 3 * 60 * 60 * 1000; // 3 hours - const delayMs = Math.min( - baseDelayMs * Math.pow(2, newRetryCount - 1), - maxDelayMs, + // 使用预定义的退避时间表 + const backoffIndex = Math.min( + newRetryCount - 1, + this.BACKOFF_INTERVALS.length - 1, ); + const delayMs = this.BACKOFF_INTERVALS[backoffIndex]; await this.prisma.outboxEvent.update({ where: { id }, diff --git a/backend/services/mining-wallet-service/src/infrastructure/persistence/repositories/processed-event.repository.ts b/backend/services/mining-wallet-service/src/infrastructure/persistence/repositories/processed-event.repository.ts new file mode 100644 index 00000000..26e95315 --- /dev/null +++ b/backend/services/mining-wallet-service/src/infrastructure/persistence/repositories/processed-event.repository.ts @@ -0,0 +1,56 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { ProcessedEvent } from '@prisma/client'; + +@Injectable() +export class ProcessedEventRepository { + constructor(private readonly prisma: PrismaService) {} + + /** + * 根据 eventId 查找已处理的事件 + */ + async findByEventId(eventId: string): Promise { + return this.prisma.processedEvent.findUnique({ + where: { eventId }, + }); + } + + /** + * 创建已处理事件记录 + */ + async create(data: { + eventId: string; + eventType: string; + sourceService: string; + }): Promise { + return this.prisma.processedEvent.create({ + data: { + eventId: data.eventId, + eventType: data.eventType, + sourceService: data.sourceService, + }, + }); + } + + /** + * 删除过期的已处理事件记录(保留24小时) + */ + async deleteOldEvents(olderThan: Date): Promise { + const result = await this.prisma.processedEvent.deleteMany({ + where: { + processedAt: { lt: olderThan }, + }, + }); + return result.count; + } + + /** + * 检查事件是否已处理 + */ + async isProcessed(eventId: string): Promise { + const count = await this.prisma.processedEvent.count({ + where: { eventId }, + }); + return count > 0; + } +}