feat(contribution/wallet): 实现贡献值2.0计算与钱包存储架构

主要变更:
- contribution-service: 添加省市字段到认种同步数据
- contribution-service: 实现分配结果发布服务,通过Outbox模式发布到Kafka
- contribution-service: 更新Outbox调度器,支持4小时最大退避重试
- mining-wallet-service: 添加贡献值消费者,处理分配结果入账
- mining-wallet-service: 添加用户注册消费者,自动创建钱包
- mining-wallet-service: 添加贡献值过期调度器
- mining-wallet-service: 系统账户添加contributionBalance字段

Kafka事件流:
- contribution.distribution.completed: 分配结果事件
- auth.user.registered: 用户注册事件

可靠性保证:
- Outbox模式确保事件可靠发布
- 4小时幂等退避策略(30s,1m,2m,5m,10m,30m,1h,2h,4h)
- Redis+DB双重幂等检查

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 06:13:18 -08:00
parent 1b8791fe5d
commit 7fe954e563
22 changed files with 1518 additions and 47 deletions

View File

@ -0,0 +1,7 @@
-- AlterTable: 添加认种省市字段到 synced_adoptions 表
-- 这些字段从 1.0 planting_orders 表的 selected_province/selected_city 同步
ALTER TABLE "synced_adoptions" ADD COLUMN "selected_province" VARCHAR(10);
ALTER TABLE "synced_adoptions" ADD COLUMN "selected_city" VARCHAR(10);
-- CreateIndex: 添加省市组合索引
CREATE INDEX "synced_adoptions_selected_province_selected_city_idx" ON "synced_adoptions"("selected_province", "selected_city");

View File

@ -43,6 +43,10 @@ model SyncedAdoption {
adoptionDate DateTime @map("adoption_date") @db.Date
status String? @db.VarChar(30) // 与1.0 planting_orders.status保持一致
// 认种选择的省市(用于系统账户分配)
selectedProvince String? @map("selected_province") @db.VarChar(10)
selectedCity String? @map("selected_city") @db.VarChar(10)
// 贡献值计算参数(从认种时的配置)
contributionPerTree Decimal @map("contribution_per_tree") @db.Decimal(20, 10)
@ -59,6 +63,7 @@ model SyncedAdoption {
@@index([accountSequence])
@@index([adoptionDate])
@@index([contributionDistributed])
@@index([selectedProvince, selectedCity])
@@map("synced_adoptions")
}

View File

@ -10,6 +10,7 @@ import { CDCEventDispatcher } from './event-handlers/cdc-event-dispatcher';
// Services
import { ContributionCalculationService } from './services/contribution-calculation.service';
import { ContributionDistributionPublisherService } from './services/contribution-distribution-publisher.service';
import { SnapshotService } from './services/snapshot.service';
// Queries
@ -34,6 +35,7 @@ import { ContributionScheduler } from './schedulers/contribution.scheduler';
// Services
ContributionCalculationService,
ContributionDistributionPublisherService,
SnapshotService,
// Queries

View File

@ -45,11 +45,13 @@ export class AdoptionSyncedHandler {
private async handleCreate(data: any, sequenceNum: bigint): Promise<void> {
if (!data) return;
// planting_orders表字段: order_id, account_sequence, tree_count, created_at, status
// planting_orders表字段: order_id, account_sequence, tree_count, created_at, status, selected_province, selected_city
const orderId = data.order_id || data.id;
const accountSequence = data.account_sequence || data.accountSequence;
const treeCount = data.tree_count || data.treeCount;
const createdAt = data.created_at || data.createdAt || data.paid_at || data.paidAt;
const selectedProvince = data.selected_province || data.selectedProvince || null;
const selectedCity = data.selected_city || data.selectedCity || null;
// 第一步:保存同步的认种订单数据
await this.syncedDataRepository.upsertSyncedAdoption({
@ -58,6 +60,8 @@ export class AdoptionSyncedHandler {
treeCount: treeCount,
adoptionDate: new Date(createdAt),
status: data.status ?? null,
selectedProvince: selectedProvince,
selectedCity: selectedCity,
contributionPerTree: new Decimal('1'), // 每棵树1算力
sourceSequenceNum: sequenceNum,
});
@ -99,6 +103,8 @@ export class AdoptionSyncedHandler {
const accountSequence = after.account_sequence || after.accountSequence;
const treeCount = after.tree_count || after.treeCount;
const createdAt = after.created_at || after.createdAt || after.paid_at || after.paidAt;
const selectedProvince = after.selected_province || after.selectedProvince || null;
const selectedCity = after.selected_city || after.selectedCity || null;
// 第一步:保存同步的认种订单数据
await this.syncedDataRepository.upsertSyncedAdoption({
@ -107,6 +113,8 @@ export class AdoptionSyncedHandler {
treeCount: treeCount,
adoptionDate: new Date(createdAt),
status: after.status ?? null,
selectedProvince: selectedProvince,
selectedCity: selectedCity,
contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum,
});

View File

@ -104,6 +104,7 @@ export class ContributionScheduler implements OnModuleInit {
/**
* 30 Outbox
* 使 4 退
*/
@Cron('*/30 * * * * *')
async publishOutboxEvents(): Promise<void> {
@ -119,31 +120,29 @@ export class ContributionScheduler implements OnModuleInit {
return;
}
const successIds: bigint[] = [];
for (const event of events) {
try {
await this.kafkaProducer.emit(`contribution.${event.eventType}`, {
key: event.aggregateId,
value: {
eventId: event.id,
aggregateType: event.aggregateType,
aggregateId: event.aggregateId,
eventType: event.eventType,
payload: event.payload,
createdAt: event.createdAt.toISOString(),
},
// 使用事件中指定的 topic而不是拼接
await this.kafkaProducer.emit(event.topic, {
key: event.key,
value: event.payload,
});
successIds.push(event.id);
} catch (error) {
this.logger.error(`Failed to publish event ${event.id}`, error);
// 继续处理下一个事件
continue;
// 记录失败,使用退避策略重试
const errorMessage = error instanceof Error ? error.message : String(error);
await this.outboxRepository.markAsFailed(event.id, errorMessage);
this.logger.warn(`Event ${event.id} failed, will retry with backoff: ${errorMessage}`);
}
}
// 标记为已处理
const processedIds = events.map((e) => e.id);
await this.outboxRepository.markAsProcessed(processedIds);
this.logger.debug(`Published ${processedIds.length} outbox events`);
// 标记成功发送的事件为已处理
if (successIds.length > 0) {
await this.outboxRepository.markAsProcessed(successIds);
this.logger.debug(`Published ${successIds.length} outbox events`);
}
} catch (error) {
this.logger.error('Failed to publish outbox events', error);
} finally {

View File

@ -9,6 +9,7 @@ import { OutboxRepository } from '../../infrastructure/persistence/repositories/
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
import { ContributionAccountAggregate, ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate';
import { SyncedReferral } from '../../domain/repositories/synced-data.repository.interface';
import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service';
/**
*
@ -27,6 +28,7 @@ export class ContributionCalculationService {
private readonly systemAccountRepository: SystemAccountRepository,
private readonly outboxRepository: OutboxRepository,
private readonly unitOfWork: UnitOfWork,
private readonly distributionPublisher: ContributionDistributionPublisherService,
) {}
/**
@ -77,21 +79,16 @@ export class ContributionCalculationService {
await this.updateReferrerUnlockStatus(userReferral.referrerAccountSequence);
}
// 发布事件到 Outbox
await this.outboxRepository.save({
aggregateType: 'ContributionAccount',
aggregateId: adoption.accountSequence,
eventType: 'ContributionCalculated',
payload: {
accountSequence: adoption.accountSequence,
sourceAdoptionId: originalAdoptionId.toString(),
personalContribution: result.personalRecord.amount.value.toString(),
teamLevelCount: result.teamLevelRecords.length,
teamBonusCount: result.teamBonusRecords.length,
unallocatedCount: result.unallocatedContributions.length,
calculatedAt: new Date().toISOString(),
},
});
// 发布分配结果到 Kafka通过 Outbox 模式)
// 使用认种订单选择的省市代码
const provinceCode = adoption.selectedProvince ?? 'DEFAULT';
const cityCode = adoption.selectedCity ?? 'DEFAULT';
await this.distributionPublisher.publishDistributionResult(
adoption,
result,
provinceCode,
cityCode,
);
});
this.logger.log(

View File

@ -0,0 +1,144 @@
import { Injectable, Logger } from '@nestjs/common';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { ContributionDistributionResult } from '../../domain/services/contribution-calculator.service';
import { SyncedAdoption } from '../../domain/repositories/synced-data.repository.interface';
/**
*
* Kafka mining-wallet-service
*/
@Injectable()
export class ContributionDistributionPublisherService {
private readonly logger = new Logger(
ContributionDistributionPublisherService.name,
);
constructor(private readonly outboxRepository: OutboxRepository) {}
/**
* Kafka
*/
async publishDistributionResult(
adoption: SyncedAdoption,
result: ContributionDistributionResult,
provinceCode: string,
cityCode: string,
): Promise<void> {
const eventId = `dist-${adoption.originalAdoptionId}-${Date.now()}`;
const payload = {
eventType: 'ContributionDistributionCompleted',
eventId,
timestamp: new Date().toISOString(),
payload: {
// 认种信息
adoptionId: adoption.originalAdoptionId.toString(),
adopterAccountSequence: adoption.accountSequence,
treeCount: adoption.treeCount,
adoptionDate: adoption.adoptionDate.toISOString(),
// 用户贡献值分配
userContributions: this.mapUserContributions(result),
// 系统账户分配
systemContributions: this.mapSystemContributions(
result,
provinceCode,
cityCode,
),
// 未分配(归总部)
unallocatedToHeadquarters: this.mapUnallocated(result),
},
};
await this.outboxRepository.save({
eventType: 'ContributionDistributionCompleted',
topic: 'contribution.distribution.completed',
key: adoption.accountSequence,
payload,
aggregateId: adoption.originalAdoptionId.toString(),
aggregateType: 'Adoption',
});
this.logger.debug(
`Published distribution result for adoption ${adoption.originalAdoptionId}`,
);
}
private mapUserContributions(result: ContributionDistributionResult): any[] {
const contributions: any[] = [];
// 个人贡献
if (result.personalRecord) {
contributions.push({
accountSequence: result.personalRecord.accountSequence,
contributionType: 'PERSONAL',
amount: result.personalRecord.amount.value.toString(),
effectiveDate: result.personalRecord.effectiveDate.toISOString(),
expireDate: result.personalRecord.expireDate.toISOString(),
sourceAdoptionId: result.personalRecord.sourceAdoptionId.toString(),
sourceAccountSequence: result.personalRecord.sourceAccountSequence,
});
}
// 团队层级贡献
for (const record of result.teamLevelRecords) {
contributions.push({
accountSequence: record.accountSequence,
contributionType: 'TEAM_LEVEL',
amount: record.amount.value.toString(),
levelDepth: record.levelDepth,
effectiveDate: record.effectiveDate.toISOString(),
expireDate: record.expireDate.toISOString(),
sourceAdoptionId: record.sourceAdoptionId.toString(),
sourceAccountSequence: record.sourceAccountSequence,
});
}
// 团队奖励贡献
for (const record of result.teamBonusRecords) {
contributions.push({
accountSequence: record.accountSequence,
contributionType: 'TEAM_BONUS',
amount: record.amount.value.toString(),
bonusTier: record.bonusTier,
effectiveDate: record.effectiveDate.toISOString(),
expireDate: record.expireDate.toISOString(),
sourceAdoptionId: record.sourceAdoptionId.toString(),
sourceAccountSequence: record.sourceAccountSequence,
});
}
return contributions;
}
private mapSystemContributions(
result: ContributionDistributionResult,
provinceCode: string,
cityCode: string,
): any[] {
return result.systemContributions.map((sys) => ({
accountType: sys.accountType,
amount: sys.amount.value.toString(),
provinceCode:
sys.accountType === 'PROVINCE' || sys.accountType === 'CITY'
? provinceCode
: undefined,
cityCode: sys.accountType === 'CITY' ? cityCode : undefined,
neverExpires: sys.accountType === 'OPERATION', // 运营账户永不过期
}));
}
private mapUnallocated(result: ContributionDistributionResult): any[] {
return result.unallocatedContributions.map((u) => ({
reason: u.reason,
amount: u.amount.value.toString(),
wouldBeAccountSequence: u.wouldBeAccountSequence,
levelDepth: u.levelDepth,
bonusTier: u.type.startsWith('BONUS_TIER_')
? parseInt(u.type.split('_')[2])
: undefined,
}));
}
}

View File

@ -26,6 +26,9 @@ export interface SyncedAdoption {
treeCount: number;
adoptionDate: Date;
status: string | null;
// 认种选择的省市(用于系统账户分配)
selectedProvince: string | null;
selectedCity: string | null;
contributionPerTree: Decimal;
sourceSequenceNum: bigint;
syncedAt: Date;
@ -93,6 +96,8 @@ export interface ISyncedDataRepository {
treeCount: number;
adoptionDate: Date;
status?: string | null;
selectedProvince?: string | null;
selectedCity?: string | null;
contributionPerTree: Decimal;
sourceSequenceNum: bigint;
}): Promise<SyncedAdoption>;

View File

@ -24,12 +24,38 @@ export interface OutboxEvent {
*/
@Injectable()
export class OutboxRepository {
/**
* 退
* 第1次: 30s, 第2次: 1min, 第3次: 2min, 第4次: 5min,
* 第5次: 10min, 第6次: 30min, 第7次: 1h, 第8次: 2h, 第9次: 4h, 第10次: 4h
*/
private readonly BACKOFF_INTERVALS = [
30_000, // 30 seconds
60_000, // 1 minute
120_000, // 2 minutes
300_000, // 5 minutes
600_000, // 10 minutes
1_800_000, // 30 minutes
3_600_000, // 1 hour
7_200_000, // 2 hours
14_400_000, // 4 hours (max)
14_400_000, // 4 hours (max)
];
constructor(private readonly unitOfWork: UnitOfWork) {}
private get client(): TransactionClient {
return this.unitOfWork.getClient();
}
/**
* Kafka topic
* 例如: CONTRIBUTION_DISTRIBUTED -> contribution.distributed
*/
private buildDefaultTopic(eventType: string): string {
return 'contribution.' + eventType.toLowerCase().replace(/_/g, '.');
}
async save(event: {
aggregateType: string;
aggregateId: string;
@ -38,7 +64,7 @@ export class OutboxRepository {
topic?: string;
key?: string;
}): Promise<void> {
const topic = event.topic ?? `contribution.${event.eventType.toLowerCase()}`;
const topic = event.topic ?? this.buildDefaultTopic(event.eventType);
const key = event.key ?? event.aggregateId;
await this.client.outboxEvent.create({
@ -69,7 +95,7 @@ export class OutboxRepository {
aggregateType: e.aggregateType,
aggregateId: e.aggregateId,
eventType: e.eventType,
topic: e.topic ?? `contribution.${e.eventType.toLowerCase()}`,
topic: e.topic ?? this.buildDefaultTopic(e.eventType),
key: e.key ?? e.aggregateId,
payload: e.payload,
status: 'PENDING',
@ -79,7 +105,10 @@ export class OutboxRepository {
async findUnprocessed(limit: number): Promise<OutboxEvent[]> {
const records = await this.client.outboxEvent.findMany({
where: { status: 'PENDING' },
where: {
status: 'PENDING',
OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: new Date() } }],
},
orderBy: { createdAt: 'asc' },
take: limit,
});
@ -101,15 +130,20 @@ export class OutboxRepository {
const retryCount = event.retryCount + 1;
const shouldRetry = retryCount < event.maxRetries;
// 使用预定义的退避时间表
const backoffIndex = Math.min(
retryCount - 1,
this.BACKOFF_INTERVALS.length - 1,
);
const delayMs = this.BACKOFF_INTERVALS[backoffIndex];
await this.client.outboxEvent.update({
where: { id },
data: {
status: shouldRetry ? 'PENDING' : 'FAILED',
retryCount,
lastError: error,
nextRetryAt: shouldRetry
? new Date(Date.now() + Math.pow(2, retryCount) * 1000) // exponential backoff
: null,
nextRetryAt: shouldRetry ? new Date(Date.now() + delayMs) : null,
},
});
}

View File

@ -87,6 +87,8 @@ export class SyncedDataRepository implements ISyncedDataRepository {
treeCount: number;
adoptionDate: Date;
status?: string | null;
selectedProvince?: string | null;
selectedCity?: string | null;
contributionPerTree: Decimal;
sourceSequenceNum: bigint;
}): Promise<SyncedAdoption> {
@ -98,6 +100,8 @@ export class SyncedDataRepository implements ISyncedDataRepository {
treeCount: data.treeCount,
adoptionDate: data.adoptionDate,
status: data.status ?? null,
selectedProvince: data.selectedProvince ?? null,
selectedCity: data.selectedCity ?? null,
contributionPerTree: data.contributionPerTree,
sourceSequenceNum: data.sourceSequenceNum,
syncedAt: new Date(),
@ -107,6 +111,8 @@ export class SyncedDataRepository implements ISyncedDataRepository {
treeCount: data.treeCount,
adoptionDate: data.adoptionDate,
status: data.status ?? undefined,
selectedProvince: data.selectedProvince ?? undefined,
selectedCity: data.selectedCity ?? undefined,
contributionPerTree: data.contributionPerTree,
sourceSequenceNum: data.sourceSequenceNum,
syncedAt: new Date(),
@ -372,6 +378,8 @@ export class SyncedDataRepository implements ISyncedDataRepository {
treeCount: record.treeCount,
adoptionDate: record.adoptionDate,
status: record.status,
selectedProvince: record.selectedProvince,
selectedCity: record.selectedCity,
contributionPerTree: record.contributionPerTree,
sourceSequenceNum: record.sourceSequenceNum,
syncedAt: record.syncedAt,

View File

@ -0,0 +1,409 @@
# 2.0 贡献值计算与钱包存储方案
## 一、系统架构总览
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ 1.0 系统 (数据源) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ auth-service │ │ planting-svc │ │ referral-svc │ │
│ │ (用户注册) │ │ (认种订单) │ │ (推荐关系) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼─────────────────────────────┘
│ │ │
│ Kafka CDC │ Kafka CDC │ Kafka CDC
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ 2.0 系统 │
│ │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ contribution-service │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ CDC Consumer │ │ 贡献值计算引擎 │ │ Outbox 发布器 │ │ │
│ │ │ (同步数据) │→ │ (分配逻辑) │→ │ (Kafka) │ │ │
│ │ └─────────────────┘ └─────────────────┘ └────────┬────────┘ │ │
│ └──────────────────────────────────────────────────────┼─────────────────┘ │
│ │ │
│ Kafka: contribution.distribution.completed │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ mining-wallet-service │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ Kafka Consumer │ │ 钱包账户管理 │ │ 分类账记录 │ │ │
│ │ │ (接收分配结果) │→ │ (存储贡献值) │→ │ (交易明细) │ │ │
│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ mining-admin-service │ │
│ │ 订阅 contribution-service 和 mining-wallet-service 的事件进行数据同步 │ │
│ └────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
```
---
## 二、贡献值分配规则
### 2.1 分配比例
| 类型 | 比例 | 说明 |
|------|------|------|
| 个人贡献 | 70% | 认种人自己获得 |
| 运营账户 | 12% | 系统运营账户(永不过期) |
| 省级公司 | 1% | 认种所在省份公司 |
| 市级公司 | 2% | 认种所在城市公司 |
| 团队层级 | 7.5% | 上线1-15级每级0.5% |
| 团队奖励 | 7.5% | 直接上线的3档奖励每档2.5% |
### 2.2 解锁条件
#### 层级解锁每级0.5%共15级
| 档位 | 层级 | 解锁条件 |
|------|------|----------|
| 第1档 | L1-L5 | 直推≥1人认种 |
| 第2档 | L6-L10 | 直推≥3人认种 |
| 第3档 | L11-L15 | 直推≥5人认种 |
#### 奖励解锁每档2.5%共3档
| 档位 | 解锁条件 |
|------|----------|
| 第1档 | 自己认种 |
| 第2档 | 直推≥2人认种 |
| 第3档 | 直推≥4人认种 |
### 2.3 有效期规则
- **用户贡献值**2年有效期从认种次日开始计算
- **运营账户**:永不过期
- **未分配贡献值**:归总部账户(永不过期)
---
## 三、Kafka 事件设计
### 3.1 用户注册事件
**Topic**: `auth.user.registered`
```typescript
interface UserRegisteredEvent {
eventType: 'UserRegistered';
eventId: string;
timestamp: string;
payload: {
accountSequence: string;
phone: string;
referrerAccountSequence: string | null;
registeredAt: string;
source: 'LEGACY_MIGRATION' | 'NEW_REGISTRATION';
};
}
```
**消费者**
- `contribution-service` - 创建 ContributionAccount
- `mining-wallet-service` - 创建用户钱包CONTRIBUTION 类型)
### 3.2 认种完成事件
**Topic**: `planting.adoption.completed`
```typescript
interface AdoptionCompletedEvent {
eventType: 'AdoptionCompleted';
eventId: string;
timestamp: string;
payload: {
adoptionId: string;
accountSequence: string;
treeCount: number;
contributionPerTree: string;
adoptionDate: string;
provinceCode: string;
cityCode: string;
};
}
```
**消费者**
- `contribution-service` - 触发贡献值计算
### 3.3 贡献值分配完成事件
**Topic**: `contribution.distribution.completed`
```typescript
interface ContributionDistributionCompletedEvent {
eventType: 'ContributionDistributionCompleted';
eventId: string;
timestamp: string;
payload: {
// 认种信息
adoptionId: string;
adopterAccountSequence: string;
treeCount: number;
adoptionDate: string;
// 用户贡献值分配
userContributions: {
accountSequence: string;
contributionType: 'PERSONAL' | 'TEAM_LEVEL' | 'TEAM_BONUS';
amount: string;
levelDepth?: number; // 1-15 for TEAM_LEVEL
bonusTier?: number; // 1-3 for TEAM_BONUS
effectiveDate: string;
expireDate: string;
sourceAdoptionId: string;
sourceAccountSequence: string;
}[];
// 系统账户分配
systemContributions: {
accountType: 'OPERATION' | 'PROVINCE' | 'CITY' | 'HEADQUARTERS';
amount: string;
provinceCode?: string;
cityCode?: string;
neverExpires: boolean;
}[];
// 未分配(归总部)
unallocatedToHeadquarters: {
reason: string;
amount: string;
wouldBeAccountSequence?: string;
levelDepth?: number;
bonusTier?: number;
}[];
};
}
```
**消费者**
- `mining-wallet-service` - 存储贡献值到钱包
### 3.4 贡献值入账事件
**Topic**: `mining-wallet.contribution.credited`
```typescript
interface ContributionCreditedEvent {
eventType: 'ContributionCredited';
eventId: string;
timestamp: string;
payload: {
accountSequence: string;
walletType: 'CONTRIBUTION';
amount: string;
balanceAfter: string;
transactionId: string;
sourceType: 'ADOPTION_DISTRIBUTION';
referenceId: string;
};
}
```
**消费者**
- `mining-admin-service` - 同步数据用于展示
---
## 四、Outbox 模式
### 4.1 Outbox 表结构
```prisma
model OutboxEvent {
id BigInt @id @default(autoincrement()) @map("outbox_id")
eventType String @map("event_type") @db.VarChar(100)
topic String @map("topic") @db.VarChar(100)
key String @map("key") @db.VarChar(200)
payload Json @map("payload")
aggregateId String @map("aggregate_id") @db.VarChar(100)
aggregateType String @map("aggregate_type") @db.VarChar(50)
status String @default("PENDING") @map("status") @db.VarChar(20)
retryCount Int @default(0) @map("retry_count")
maxRetries Int @default(10) @map("max_retries")
lastError String? @map("last_error") @db.Text
createdAt DateTime @default(now()) @map("created_at")
publishedAt DateTime? @map("published_at")
nextRetryAt DateTime? @map("next_retry_at")
@@index([status, createdAt])
@@index([status, nextRetryAt])
@@map("outbox_events")
}
```
### 4.2 退避策略最大4小时
| 重试次数 | 等待时间 | 累计时间 |
|---------|---------|---------|
| 1 | 30秒 | 30秒 |
| 2 | 1分钟 | 1.5分钟 |
| 3 | 2分钟 | 3.5分钟 |
| 4 | 5分钟 | 8.5分钟 |
| 5 | 10分钟 | 18.5分钟 |
| 6 | 30分钟 | 48.5分钟 |
| 7 | 1小时 | 1小时48分 |
| 8 | 2小时 | 3小时48分 |
| 9 | 4小时 | 7小时48分 |
| 10 | 4小时(max) | 11小时48分 |
```typescript
const BACKOFF_INTERVALS = [
30_000, // 30 seconds
60_000, // 1 minute
120_000, // 2 minutes
300_000, // 5 minutes
600_000, // 10 minutes
1_800_000, // 30 minutes
3_600_000, // 1 hour
7_200_000, // 2 hours
14_400_000, // 4 hours (max)
14_400_000, // 4 hours (max)
];
```
### 4.3 幂等性保证4小时去重窗口
```prisma
model ProcessedEvent {
id String @id @default(uuid())
eventId String @unique @map("event_id")
eventType String @map("event_type")
sourceService String @map("source_service")
processedAt DateTime @default(now()) @map("processed_at")
@@index([sourceService])
@@index([processedAt])
@@map("processed_events")
}
```
- **Redis 缓存**4小时 TTL快速路径检查
- **DB 持久化**ProcessedEvent 表24小时后清理
- **双重检查**:先查 Redis未命中再查 DB
---
## 五、服务职责划分
| 服务 | 职责 | 数据存储 |
|------|------|----------|
| **auth-service** | 用户注册/迁移,发送 UserRegistered 事件 | 用户基础信息 |
| **planting-service** | 认种订单,发送 AdoptionCompleted 事件 | 认种订单 |
| **contribution-service** | 贡献值计算逻辑,解锁状态管理 | 计算明细、解锁事件(用于审计) |
| **mining-wallet-service** | 贡献值存储,余额管理,过期处理 | 钱包余额、交易明细(分类账) |
| **mining-admin-service** | 数据聚合展示 | 同步缓存 |
---
## 六、数据模型
### 6.1 contribution-service已实现
- `ContributionAccount` - 用户贡献值账户(汇总)
- `ContributionRecord` - 贡献值明细(审计)
- `UnlockEvent` - 解锁事件记录
- `UnallocatedContribution` - 未分配贡献值
- `SystemAccount` - 系统账户(运营/省/市/总部)
### 6.2 mining-wallet-service需扩展
现有模型:
- `UserWallet` - 用户钱包CONTRIBUTION 类型)
- `UserWalletTransaction` - 交易明细(分类账)
- `SystemAccount` - 系统账户
需要添加:
- SystemAccount 添加 `contributionBalance` 字段
- TransactionType 添加 `CONTRIBUTION_CREDIT`、`CONTRIBUTION_EXPIRE` 枚举值
---
## 七、实施步骤
### Phase 1: mining-wallet-service 扩展
1. 修改 schema 添加 SystemAccount 贡献值字段
2. 实现 ContributionWalletService
3. 实现 ContributionDistributionConsumer
4. 实现 UserRegisteredConsumer创建用户钱包
5. 实现 ContributionExpiryScheduler
6. 实现 Outbox Scheduler4小时退避
### Phase 2: contribution-service 扩展
1. 实现 ContributionDistributionPublisherService
2. 修改 ContributionCalculationService 调用发布器
3. 测试完整的分配流程
### Phase 3: auth-service 集成
1. 确保 UserRegistered 事件正确发送
2. 包含 referrerAccountSequence 信息
### Phase 4: 历史数据迁移
1. 批量处理1.0的历史认种数据
2. 逐笔计算并发送分配事件
3. 验证钱包余额正确性
---
## 八、关键流程
### 8.1 认种触发贡献值分配
```
1. planting-service 发送 AdoptionCompleted 事件
2. contribution-service 消费事件
├─ 查询认种人的上线链条最多15级
├─ 查询各上线的解锁状态
├─ 计算贡献值分配
└─ 写入 Outbox 表
3. Outbox Scheduler 发送到 Kafka
4. mining-wallet-service 消费 ContributionDistributionCompleted
├─ 幂等性检查
├─ 更新用户钱包余额
├─ 更新系统账户余额
└─ 记录交易明细
5. mining-admin-service 消费 ContributionCredited
└─ 同步缓存数据
```
### 8.2 贡献值过期处理
```
1. 每日凌晨1点定时任务启动
2. 查询 expireDate <= today 的交易记录
3. 逐笔处理过期
├─ 扣减钱包余额
├─ 创建过期交易记录
└─ 标记原交易为已过期
```
---
## 九、监控与告警
### 9.1 关键指标
- Outbox 积压数量
- 事件处理延迟
- 重试次数分布
- 失败事件数量
### 9.2 告警规则
- Outbox 积压 > 1000 条
- 事件处理延迟 > 5 分钟
- 失败事件数量 > 10 条/小时

View File

@ -0,0 +1,2 @@
-- AlterTable: 添加贡献值余额字段到系统账户
ALTER TABLE "system_accounts" ADD COLUMN "contribution_balance" DECIMAL(30,8) NOT NULL DEFAULT 0;

View File

@ -187,6 +187,7 @@ model SystemAccount {
shareBalance Decimal @default(0) @map("share_balance") @db.Decimal(30, 8)
usdtBalance Decimal @default(0) @map("usdt_balance") @db.Decimal(30, 8)
greenPointBalance Decimal @default(0) @map("green_point_balance") @db.Decimal(30, 8)
contributionBalance Decimal @default(0) @map("contribution_balance") @db.Decimal(30, 8)
frozenShare Decimal @default(0) @map("frozen_share") @db.Decimal(30, 8)
frozenUsdt Decimal @default(0) @map("frozen_usdt") @db.Decimal(30, 8)

View File

@ -6,9 +6,11 @@ import { SystemAccountService } from './services/system-account.service';
import { PoolAccountService } from './services/pool-account.service';
import { UserWalletService } from './services/user-wallet.service';
import { BlockchainIntegrationService } from './services/blockchain.service';
import { ContributionWalletService } from './services/contribution-wallet.service';
// Schedulers
import { OutboxScheduler } from './schedulers/outbox.scheduler';
import { ContributionExpiryScheduler } from './schedulers/contribution-expiry.scheduler';
@Module({
imports: [ScheduleModule.forRoot()],
@ -18,14 +20,17 @@ import { OutboxScheduler } from './schedulers/outbox.scheduler';
PoolAccountService,
UserWalletService,
BlockchainIntegrationService,
ContributionWalletService,
// Schedulers
OutboxScheduler,
ContributionExpiryScheduler,
],
exports: [
SystemAccountService,
PoolAccountService,
UserWalletService,
BlockchainIntegrationService,
ContributionWalletService,
],
})
export class ApplicationModule {}

View File

@ -0,0 +1,131 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
import { ContributionWalletService } from '../services/contribution-wallet.service';
import { UserWalletType, AssetType } from '@prisma/client';
import Decimal from 'decimal.js';
@Injectable()
export class ContributionExpiryScheduler {
private readonly logger = new Logger(ContributionExpiryScheduler.name);
private readonly LOCK_KEY = 'mining-wallet:contribution-expiry:lock';
private readonly LOCK_TTL = 3600; // 1 hour
constructor(
private readonly prisma: PrismaService,
private readonly redis: RedisService,
private readonly contributionWalletService: ContributionWalletService,
) {}
/**
* 1
*/
@Cron('0 1 * * *')
async processExpiredContributions(): Promise<void> {
const lockValue = await this.redis.acquireLock(this.LOCK_KEY, this.LOCK_TTL);
if (!lockValue) {
this.logger.debug('Another instance is processing expired contributions');
return;
}
try {
this.logger.log('Starting contribution expiry processing');
const today = new Date();
today.setHours(0, 0, 0, 0);
// 查找所有过期的贡献值交易记录
// 通过 metadata.expireDate 字段判断
const expiredTransactions = await this.prisma.userWalletTransaction.findMany({
where: {
walletType: UserWalletType.CONTRIBUTION,
assetType: AssetType.CONTRIBUTION,
amount: { gt: 0 }, // 只查找入账记录
// 检查 metadata 中的 expireDate
// 注意: Prisma 的 JSON 过滤有限制,可能需要原生查询
},
take: 1000, // 每次处理1000条
orderBy: { createdAt: 'asc' },
});
let expiredCount = 0;
for (const tx of expiredTransactions) {
const metadata = tx.metadata as any;
if (!metadata?.expireDate || metadata?.isExpired) {
continue;
}
const expireDate = new Date(metadata.expireDate);
if (expireDate > today) {
continue;
}
try {
await this.contributionWalletService.expireContribution(
tx.id,
tx.accountSequence,
new Decimal(tx.amount.toString()),
);
// 标记原交易为已过期
await this.prisma.userWalletTransaction.update({
where: { id: tx.id },
data: {
metadata: {
...metadata,
isExpired: true,
expiredAt: new Date().toISOString(),
},
},
});
expiredCount++;
} catch (error) {
this.logger.error(
`Failed to expire contribution ${tx.id}`,
error instanceof Error ? error.stack : error,
);
}
}
this.logger.log(`Processed ${expiredCount} expired contributions`);
} finally {
await this.redis.releaseLock(this.LOCK_KEY, lockValue);
}
}
/**
* 3 ProcessedEvent 24
*/
@Cron('0 3 * * *')
async cleanupProcessedEvents(): Promise<void> {
const lockValue = await this.redis.acquireLock(
`${this.LOCK_KEY}:cleanup`,
60,
);
if (!lockValue) {
return;
}
try {
const cutoffDate = new Date();
cutoffDate.setHours(cutoffDate.getHours() - 24);
const result = await this.prisma.processedEvent.deleteMany({
where: {
processedAt: { lt: cutoffDate },
},
});
if (result.count > 0) {
this.logger.log(
`Cleaned up ${result.count} processed events older than 24 hours`,
);
}
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue);
}
}
}

View File

@ -0,0 +1,284 @@
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { UserWalletType, AssetType, TransactionType } from '@prisma/client';
import Decimal from 'decimal.js';
export interface CreditContributionInput {
accountSequence: string;
amount: Decimal;
contributionType: 'PERSONAL' | 'TEAM_LEVEL' | 'TEAM_BONUS';
levelDepth?: number;
bonusTier?: number;
effectiveDate: Date;
expireDate: Date;
sourceAdoptionId: string;
sourceAccountSequence: string;
}
export interface CreditSystemContributionInput {
accountType: 'OPERATION' | 'PROVINCE' | 'CITY' | 'HEADQUARTERS';
amount: Decimal;
provinceCode?: string;
cityCode?: string;
neverExpires: boolean;
sourceAdoptionId: string;
sourceAccountSequence: string;
memo?: string;
}
@Injectable()
export class ContributionWalletService {
private readonly logger = new Logger(ContributionWalletService.name);
constructor(
private readonly prisma: PrismaService,
private readonly outboxRepo: OutboxRepository,
) {}
/**
*
*/
async creditContribution(input: CreditContributionInput): Promise<void> {
await this.prisma.$transaction(async (tx) => {
// 1. 获取或创建用户贡献值钱包
let wallet = await tx.userWallet.findUnique({
where: {
accountSequence_walletType: {
accountSequence: input.accountSequence,
walletType: UserWalletType.CONTRIBUTION,
},
},
});
if (!wallet) {
wallet = await tx.userWallet.create({
data: {
accountSequence: input.accountSequence,
walletType: UserWalletType.CONTRIBUTION,
balance: new Decimal(0),
frozenBalance: new Decimal(0),
},
});
}
const balanceBefore = new Decimal(wallet.balance.toString());
const balanceAfter = balanceBefore.plus(input.amount);
// 2. 更新钱包余额
await tx.userWallet.update({
where: { id: wallet.id },
data: {
balance: balanceAfter,
totalInflow: { increment: input.amount },
},
});
// 3. 创建交易记录(分类账)
const transaction = await tx.userWalletTransaction.create({
data: {
userWalletId: wallet.id,
accountSequence: input.accountSequence,
walletType: UserWalletType.CONTRIBUTION,
transactionType: TransactionType.TRANSFER_IN,
assetType: AssetType.CONTRIBUTION,
amount: input.amount,
balanceBefore,
balanceAfter,
counterpartyType: 'EXTERNAL',
referenceId: input.sourceAdoptionId,
referenceType: 'ADOPTION',
memo: this.buildMemo(input),
metadata: {
contributionType: input.contributionType,
levelDepth: input.levelDepth,
bonusTier: input.bonusTier,
effectiveDate: input.effectiveDate.toISOString(),
expireDate: input.expireDate.toISOString(),
sourceAccountSequence: input.sourceAccountSequence,
},
},
});
// 4. 发布事件到 Outbox
await tx.outboxEvent.create({
data: {
aggregateType: 'UserWallet',
aggregateId: wallet.id,
eventType: 'CONTRIBUTION_CREDITED',
topic: 'mining-wallet.contribution.credited',
key: input.accountSequence,
payload: {
accountSequence: input.accountSequence,
walletType: 'CONTRIBUTION',
amount: input.amount.toString(),
balanceAfter: balanceAfter.toString(),
transactionId: transaction.id,
contributionType: input.contributionType,
sourceAdoptionId: input.sourceAdoptionId,
sourceAccountSequence: input.sourceAccountSequence,
levelDepth: input.levelDepth,
bonusTier: input.bonusTier,
},
},
});
this.logger.debug(
`Credited ${input.amount} contribution to ${input.accountSequence}, type: ${input.contributionType}`,
);
});
}
/**
*
*/
async creditSystemContribution(
input: CreditSystemContributionInput,
): Promise<void> {
await this.prisma.$transaction(async (tx) => {
// 1. 根据 accountType 和区域查找系统账户
let whereClause: any = { accountType: input.accountType };
if (input.accountType === 'PROVINCE' && input.provinceCode) {
whereClause = {
accountType: input.accountType,
province: { code: input.provinceCode },
};
} else if (input.accountType === 'CITY' && input.cityCode) {
whereClause = {
accountType: input.accountType,
city: { code: input.cityCode },
};
}
const systemAccount = await tx.systemAccount.findFirst({
where: whereClause,
});
if (!systemAccount) {
this.logger.warn(
`System account not found: ${input.accountType}, province: ${input.provinceCode}, city: ${input.cityCode}`,
);
return;
}
const balanceBefore = new Decimal(
systemAccount.contributionBalance?.toString() || '0',
);
const balanceAfter = balanceBefore.plus(input.amount);
// 2. 更新系统账户贡献值余额
await tx.systemAccount.update({
where: { id: systemAccount.id },
data: {
contributionBalance: balanceAfter,
},
});
// 3. 创建系统账户交易记录
await tx.systemAccountTransaction.create({
data: {
systemAccountId: systemAccount.id,
transactionType: TransactionType.TRANSFER_IN,
assetType: AssetType.CONTRIBUTION,
amount: input.amount,
balanceBefore,
balanceAfter,
counterpartyType: 'USER',
counterpartyAccountSeq: input.sourceAccountSequence,
referenceId: input.sourceAdoptionId,
referenceType: 'ADOPTION',
memo:
input.memo ||
`贡献值分配, 来源认种: ${input.sourceAdoptionId}, 认种人: ${input.sourceAccountSequence}`,
metadata: {
neverExpires: input.neverExpires,
},
},
});
this.logger.debug(
`Credited ${input.amount} contribution to system account ${systemAccount.code}`,
);
});
}
/**
*
*/
async expireContribution(
transactionId: string,
accountSequence: string,
amount: Decimal,
): Promise<void> {
await this.prisma.$transaction(async (tx) => {
const wallet = await tx.userWallet.findUnique({
where: {
accountSequence_walletType: {
accountSequence,
walletType: UserWalletType.CONTRIBUTION,
},
},
});
if (!wallet) {
this.logger.warn(
`Wallet not found for expiry: ${accountSequence}`,
);
return;
}
const balanceBefore = new Decimal(wallet.balance.toString());
let balanceAfter = balanceBefore.minus(amount);
// 确保余额不为负
if (balanceAfter.lt(0)) {
balanceAfter = new Decimal(0);
}
// 更新钱包余额
await tx.userWallet.update({
where: { id: wallet.id },
data: {
balance: balanceAfter,
totalOutflow: { increment: amount },
},
});
// 创建过期交易记录
await tx.userWalletTransaction.create({
data: {
userWalletId: wallet.id,
accountSequence,
walletType: UserWalletType.CONTRIBUTION,
transactionType: TransactionType.TRANSFER_OUT,
assetType: AssetType.CONTRIBUTION,
amount: amount.negated(),
balanceBefore,
balanceAfter,
counterpartyType: 'SYSTEM_ACCOUNT',
referenceId: transactionId,
referenceType: 'EXPIRY',
memo: `贡献值过期, 原交易ID: ${transactionId}`,
metadata: {
originalTransactionId: transactionId,
expiredAt: new Date().toISOString(),
},
},
});
this.logger.debug(
`Expired ${amount} contribution for ${accountSequence}`,
);
});
}
private buildMemo(input: CreditContributionInput): string {
const typeMap: Record<string, string> = {
PERSONAL: '个人认种贡献',
TEAM_LEVEL: `团队层级贡献(第${input.levelDepth}级)`,
TEAM_BONUS: `团队奖励贡献(第${input.bonusTier}档)`,
};
return `${typeMap[input.contributionType]}, 来源认种: ${input.sourceAdoptionId}, 认种人: ${input.sourceAccountSequence}`;
}
}

View File

@ -8,9 +8,12 @@ import { UserWalletRepository } from './persistence/repositories/user-wallet.rep
import { RegionRepository } from './persistence/repositories/region.repository';
import { BlockchainRepository } from './persistence/repositories/blockchain.repository';
import { OutboxRepository } from './persistence/repositories/outbox.repository';
import { ProcessedEventRepository } from './persistence/repositories/processed-event.repository';
import { RedisService } from './redis/redis.service';
import { KafkaProducerService } from './kafka/kafka-producer.service';
import { KavaBlockchainService } from './blockchain/kava-blockchain.service';
import { ContributionDistributionConsumer } from './kafka/consumers/contribution-distribution.consumer';
import { UserRegisteredConsumer } from './kafka/consumers/user-registered.consumer';
@Global()
@Module({
@ -32,6 +35,9 @@ import { KavaBlockchainService } from './blockchain/kava-blockchain.service';
producer: {
allowAutoTopicCreation: true,
},
consumer: {
groupId: 'mining-wallet-service-group',
},
},
}),
inject: [ConfigService],
@ -46,9 +52,13 @@ import { KavaBlockchainService } from './blockchain/kava-blockchain.service';
RegionRepository,
BlockchainRepository,
OutboxRepository,
ProcessedEventRepository,
// Services
KafkaProducerService,
KavaBlockchainService,
// Consumers
ContributionDistributionConsumer,
UserRegisteredConsumer,
{
provide: 'REDIS_OPTIONS',
useFactory: (configService: ConfigService) => ({
@ -69,6 +79,7 @@ import { KavaBlockchainService } from './blockchain/kava-blockchain.service';
RegionRepository,
BlockchainRepository,
OutboxRepository,
ProcessedEventRepository,
// Services
KafkaProducerService,
KavaBlockchainService,

View File

@ -0,0 +1,156 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import Decimal from 'decimal.js';
import { PrismaService } from '../../persistence/prisma/prisma.service';
import { RedisService } from '../../redis/redis.service';
import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository';
import { ContributionWalletService } from '../../../application/services/contribution-wallet.service';
import { SystemAccountService } from '../../../application/services/system-account.service';
import {
ContributionDistributionCompletedEvent,
ContributionDistributionPayload,
} from '../events/contribution-distribution.event';
// 4小时 TTL
const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60;
@Injectable()
export class ContributionDistributionConsumer implements OnModuleInit {
private readonly logger = new Logger(ContributionDistributionConsumer.name);
constructor(
private readonly prisma: PrismaService,
private readonly redis: RedisService,
private readonly processedEventRepo: ProcessedEventRepository,
private readonly contributionWalletService: ContributionWalletService,
private readonly systemAccountService: SystemAccountService,
) {}
async onModuleInit() {
this.logger.log('ContributionDistributionConsumer initialized');
}
@EventPattern('contribution.distribution.completed')
async handleDistributionCompleted(
@Payload() message: any,
): Promise<void> {
// 解析消息格式
const event: ContributionDistributionCompletedEvent =
message.value || message;
const eventId = event.eventId || message.eventId;
if (!eventId) {
this.logger.warn('Received event without eventId, skipping');
return;
}
this.logger.debug(`Processing distribution event: ${eventId}`);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
await this.processDistribution(event.payload);
// 标记为已处理
await this.markEventProcessed(eventId, event.eventType);
this.logger.log(
`Distribution for adoption ${event.payload.adoptionId} processed successfully`,
);
} catch (error) {
this.logger.error(
`Failed to process distribution for adoption ${event.payload.adoptionId}`,
error instanceof Error ? error.stack : error,
);
throw error; // 让 Kafka 重试
}
}
private async processDistribution(
payload: ContributionDistributionPayload,
): Promise<void> {
// 1. 处理用户贡献值
for (const userContrib of payload.userContributions) {
await this.contributionWalletService.creditContribution({
accountSequence: userContrib.accountSequence,
amount: new Decimal(userContrib.amount),
contributionType: userContrib.contributionType,
levelDepth: userContrib.levelDepth,
bonusTier: userContrib.bonusTier,
effectiveDate: new Date(userContrib.effectiveDate),
expireDate: new Date(userContrib.expireDate),
sourceAdoptionId: userContrib.sourceAdoptionId,
sourceAccountSequence: userContrib.sourceAccountSequence,
});
}
// 2. 处理系统账户贡献值
for (const sysContrib of payload.systemContributions) {
await this.contributionWalletService.creditSystemContribution({
accountType: sysContrib.accountType,
amount: new Decimal(sysContrib.amount),
provinceCode: sysContrib.provinceCode,
cityCode: sysContrib.cityCode,
neverExpires: sysContrib.neverExpires,
sourceAdoptionId: payload.adoptionId,
sourceAccountSequence: payload.adopterAccountSequence,
});
}
// 3. 处理未分配的贡献值(归总部)
for (const unalloc of payload.unallocatedToHeadquarters) {
await this.contributionWalletService.creditSystemContribution({
accountType: 'HEADQUARTERS',
amount: new Decimal(unalloc.amount),
neverExpires: true,
sourceAdoptionId: payload.adoptionId,
sourceAccountSequence: payload.adopterAccountSequence,
memo: unalloc.reason,
});
}
}
/**
* - Redis + DB 4
*/
private async isEventProcessed(eventId: string): Promise<boolean> {
const redisKey = `processed-event:${eventId}`;
// 1. 先检查 Redis 缓存(快速路径)
const cached = await this.redis.get(redisKey);
if (cached) return true;
// 2. 检查数据库
const dbRecord = await this.processedEventRepo.findByEventId(eventId);
if (dbRecord) {
// 回填 Redis 缓存
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
return true;
}
return false;
}
/**
*
*/
private async markEventProcessed(
eventId: string,
eventType: string,
): Promise<void> {
// 1. 写入数据库
await this.processedEventRepo.create({
eventId,
eventType,
sourceService: 'contribution-service',
});
// 2. 写入 Redis 缓存4小时 TTL
const redisKey = `processed-event:${eventId}`;
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
}
}

View File

@ -0,0 +1,118 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import { RedisService } from '../../redis/redis.service';
import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository';
import { UserWalletService } from '../../../application/services/user-wallet.service';
import { UserRegisteredEvent } from '../events/contribution-distribution.event';
// 4小时 TTL
const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60;
@Injectable()
export class UserRegisteredConsumer implements OnModuleInit {
private readonly logger = new Logger(UserRegisteredConsumer.name);
constructor(
private readonly redis: RedisService,
private readonly processedEventRepo: ProcessedEventRepository,
private readonly userWalletService: UserWalletService,
) {}
async onModuleInit() {
this.logger.log('UserRegisteredConsumer initialized');
}
@EventPattern('auth.user.registered')
async handleUserRegistered(@Payload() message: any): Promise<void> {
// 解析消息格式
const event: UserRegisteredEvent = message.value || message;
const eventId = event.eventId || message.eventId;
if (!eventId) {
this.logger.warn('Received event without eventId, skipping');
return;
}
const { accountSequence } = event.payload;
this.logger.debug(
`Processing user registered event: ${eventId}, accountSequence: ${accountSequence}`,
);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
// 为用户创建所有类型的钱包
await this.userWalletService.createWalletsForUser(accountSequence);
// 标记为已处理
await this.markEventProcessed(eventId, event.eventType);
this.logger.log(
`Wallets created for user ${accountSequence}, source: ${event.payload.source}`,
);
} catch (error) {
// 如果是重复创建钱包的错误,忽略
if (
error instanceof Error &&
error.message.includes('Unique constraint')
) {
this.logger.debug(
`Wallets already exist for user ${accountSequence}, marking as processed`,
);
await this.markEventProcessed(eventId, event.eventType);
return;
}
this.logger.error(
`Failed to create wallets for user ${accountSequence}`,
error instanceof Error ? error.stack : error,
);
throw error; // 让 Kafka 重试
}
}
/**
* - Redis + DB 4
*/
private async isEventProcessed(eventId: string): Promise<boolean> {
const redisKey = `processed-event:${eventId}`;
// 1. 先检查 Redis 缓存(快速路径)
const cached = await this.redis.get(redisKey);
if (cached) return true;
// 2. 检查数据库
const dbRecord = await this.processedEventRepo.findByEventId(eventId);
if (dbRecord) {
// 回填 Redis 缓存
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
return true;
}
return false;
}
/**
*
*/
private async markEventProcessed(
eventId: string,
eventType: string,
): Promise<void> {
// 1. 写入数据库
await this.processedEventRepo.create({
eventId,
eventType,
sourceService: 'auth-service',
});
// 2. 写入 Redis 缓存4小时 TTL
const redisKey = `processed-event:${eventId}`;
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
}
}

View File

@ -0,0 +1,72 @@
/**
*
* contribution-service
*/
export interface ContributionDistributionCompletedEvent {
eventType: 'ContributionDistributionCompleted';
eventId: string;
timestamp: string;
payload: ContributionDistributionPayload;
}
export interface ContributionDistributionPayload {
// 认种信息
adoptionId: string;
adopterAccountSequence: string;
treeCount: number;
adoptionDate: string;
// 用户贡献值分配
userContributions: UserContributionItem[];
// 系统账户分配
systemContributions: SystemContributionItem[];
// 未分配(归总部)
unallocatedToHeadquarters: UnallocatedContributionItem[];
}
export interface UserContributionItem {
accountSequence: string;
contributionType: 'PERSONAL' | 'TEAM_LEVEL' | 'TEAM_BONUS';
amount: string;
levelDepth?: number; // 1-15 for TEAM_LEVEL
bonusTier?: number; // 1-3 for TEAM_BONUS
effectiveDate: string;
expireDate: string;
sourceAdoptionId: string;
sourceAccountSequence: string;
}
export interface SystemContributionItem {
accountType: 'OPERATION' | 'PROVINCE' | 'CITY' | 'HEADQUARTERS';
amount: string;
provinceCode?: string;
cityCode?: string;
neverExpires: boolean;
}
export interface UnallocatedContributionItem {
reason: string;
amount: string;
wouldBeAccountSequence?: string;
levelDepth?: number;
bonusTier?: number;
}
/**
*
* auth-service
*/
export interface UserRegisteredEvent {
eventType: 'UserRegistered';
eventId: string;
timestamp: string;
payload: {
accountSequence: string;
phone: string;
referrerAccountSequence: string | null;
registeredAt: string;
source: 'LEGACY_MIGRATION' | 'NEW_REGISTRATION';
};
}

View File

@ -4,6 +4,24 @@ import { OutboxEvent, OutboxStatus } from '@prisma/client';
@Injectable()
export class OutboxRepository {
/**
* 退
* 第1次: 30s, 第2次: 1min, 第3次: 2min, 第4次: 5min,
* 第5次: 10min, 第6次: 30min, 第7次: 1h, 第8次: 2h, 第9次: 4h, 第10次: 4h
*/
private readonly BACKOFF_INTERVALS = [
30_000, // 30 seconds
60_000, // 1 minute
120_000, // 2 minutes
300_000, // 5 minutes
600_000, // 10 minutes
1_800_000, // 30 minutes
3_600_000, // 1 hour
7_200_000, // 2 hours
14_400_000, // 4 hours (max)
14_400_000, // 4 hours (max)
];
constructor(private readonly prisma: PrismaService) {}
/**
@ -58,7 +76,7 @@ export class OutboxRepository {
}
/**
* 退3
* 退4
*/
async markAsFailed(
id: string,
@ -69,13 +87,12 @@ export class OutboxRepository {
const newRetryCount = currentRetryCount + 1;
const shouldFail = newRetryCount >= maxRetries;
// 指数退避: 30s, 60s, 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 10800s (最大3小时)
const baseDelayMs = 30000; // 30 seconds
const maxDelayMs = 3 * 60 * 60 * 1000; // 3 hours
const delayMs = Math.min(
baseDelayMs * Math.pow(2, newRetryCount - 1),
maxDelayMs,
// 使用预定义的退避时间表
const backoffIndex = Math.min(
newRetryCount - 1,
this.BACKOFF_INTERVALS.length - 1,
);
const delayMs = this.BACKOFF_INTERVALS[backoffIndex];
await this.prisma.outboxEvent.update({
where: { id },

View File

@ -0,0 +1,56 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { ProcessedEvent } from '@prisma/client';
@Injectable()
export class ProcessedEventRepository {
constructor(private readonly prisma: PrismaService) {}
/**
* eventId
*/
async findByEventId(eventId: string): Promise<ProcessedEvent | null> {
return this.prisma.processedEvent.findUnique({
where: { eventId },
});
}
/**
*
*/
async create(data: {
eventId: string;
eventType: string;
sourceService: string;
}): Promise<ProcessedEvent> {
return this.prisma.processedEvent.create({
data: {
eventId: data.eventId,
eventType: data.eventType,
sourceService: data.sourceService,
},
});
}
/**
* 24
*/
async deleteOldEvents(olderThan: Date): Promise<number> {
const result = await this.prisma.processedEvent.deleteMany({
where: {
processedAt: { lt: olderThan },
},
});
return result.count;
}
/**
*
*/
async isProcessed(eventId: string): Promise<boolean> {
const count = await this.prisma.processedEvent.count({
where: { eventId },
});
return count > 0;
}
}