fix(contribution): ensure 100% reliable CDC sync to mining-admin-service

- Add ContributionAccountUpdatedEvent for real-time account updates
- Publish outbox events when saving distribution results
- Publish outbox events when updating adopter/referrer unlock status
- Add incremental sync every 10 minutes for recently updated accounts
- Add daily full sync at 4am as final consistency guarantee
- Add findRecentlyUpdated repository method for incremental sync

Three-layer sync guarantee:
1. Real-time: publish events on every account update
2. Incremental: scan accounts updated in last 15 minutes every 10 mins
3. Full sync: publish all accounts daily at 4am

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-13 19:27:50 -08:00
parent 3999d7cc51
commit fdfc2d6700
6 changed files with 244 additions and 2 deletions

View File

@ -768,7 +768,8 @@
"Bash(python3 -c \" import sys content = sys.stdin.read\\(\\) old = '''''' done # 清空 processed_cdc_events 表(因为 migration 时可能已经消费了一些消息) # 这是事务性幂等消费的关键:重置 Kafka offset 后必须同时清空幂等记录 log_info \"\"Truncating processed_cdc_events tables to allow re-consumption...\"\" for db in \"\"rwa_contribution\"\" \"\"rwa_auth\"\"; do if run_psql \"\"$db\"\" \"\"TRUNCATE TABLE processed_cdc_events;\"\" 2>/dev/null; then log_success \"\"Truncated processed_cdc_events in $db\"\" else log_warn \"\"Could not truncate processed_cdc_events in $db \\(table may not exist yet\\)\"\" fi done log_step \"\"Step 9/18: Starting 2.0 services...\"\"'''''' new = '''''' done # 清空 processed_cdc_events 表(因为 migration 时可能已经消费了一些消息) # 这是事务性幂等消费的关键:重置 Kafka offset 后必须同时清空幂等记录 log_info \"\"Truncating processed_cdc_events tables to allow re-consumption...\"\" for db in \"\"rwa_contribution\"\" \"\"rwa_auth\"\"; do if run_psql \"\"$db\"\" \"\"TRUNCATE TABLE processed_cdc_events;\"\" 2>/dev/null; then log_success \"\"Truncated processed_cdc_events in $db\"\" else log_warn \"\"Could not truncate processed_cdc_events in $db \\(table may not exist yet\\)\"\" fi done log_step \"\"Step 9/18: Starting 2.0 services...\"\"'''''' print\\(content.replace\\(old, new\\)\\) \")",
"Bash(git rm:*)",
"Bash(echo \"请在服务器运行以下命令检查 outbox 事件:\n\ndocker exec -it rwa-postgres psql -U rwa_user -d rwa_contribution -c \"\"\nSELECT id, event_type, aggregate_id, \n payload->>''sourceType'' as source_type,\n payload->>''accountSequence'' as account_seq,\n payload->>''sourceAccountSequence'' as source_account_seq,\n payload->>''bonusTier'' as bonus_tier\nFROM outbox_events \nWHERE payload->>''accountSequence'' = ''D25122900007''\nORDER BY id;\n\"\"\")",
"Bash(ssh -o ConnectTimeout=10 ceshi@14.215.128.96 'find /home/ceshi/rwadurian/frontend/mining-admin-web -name \"\"*.tsx\"\" -o -name \"\"*.ts\"\" | xargs grep -l \"\"用户管理\\\\|users\"\" 2>/dev/null | head -10')"
"Bash(ssh -o ConnectTimeout=10 ceshi@14.215.128.96 'find /home/ceshi/rwadurian/frontend/mining-admin-web -name \"\"*.tsx\"\" -o -name \"\"*.ts\"\" | xargs grep -l \"\"用户管理\\\\|users\"\" 2>/dev/null | head -10')",
"Bash(dir /s /b \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\")"
],
"deny": [],
"ask": []

View File

@ -3,9 +3,11 @@ import { Cron, CronExpression } from '@nestjs/schedule';
import { ContributionCalculationService } from '../services/contribution-calculation.service';
import { SnapshotService } from '../services/snapshot.service';
import { ContributionRecordRepository } from '../../infrastructure/persistence/repositories/contribution-record.repository';
import { ContributionAccountRepository } from '../../infrastructure/persistence/repositories/contribution-account.repository';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { KafkaProducerService } from '../../infrastructure/kafka/kafka-producer.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
import { ContributionAccountUpdatedEvent } from '../../domain/events';
/**
*
@ -19,6 +21,7 @@ export class ContributionScheduler implements OnModuleInit {
private readonly calculationService: ContributionCalculationService,
private readonly snapshotService: SnapshotService,
private readonly contributionRecordRepository: ContributionRecordRepository,
private readonly contributionAccountRepository: ContributionAccountRepository,
private readonly outboxRepository: OutboxRepository,
private readonly kafkaProducer: KafkaProducerService,
private readonly redis: RedisService,
@ -174,4 +177,128 @@ export class ContributionScheduler implements OnModuleInit {
await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue);
}
}
/**
* 10
* 15
*/
@Cron('*/10 * * * *')
async publishRecentlyUpdatedAccounts(): Promise<void> {
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:incremental-sync`, 540); // 9分钟锁
if (!lockValue) {
return;
}
try {
// 查找过去15分钟内更新的账户比10分钟多5分钟余量避免遗漏边界情况
const fifteenMinutesAgo = new Date(Date.now() - 15 * 60 * 1000);
const accounts = await this.contributionAccountRepository.findRecentlyUpdated(fifteenMinutesAgo, 500);
if (accounts.length === 0) {
return;
}
const events = accounts.map((account) => {
const event = new ContributionAccountUpdatedEvent(
account.accountSequence,
account.personalContribution.value.toString(),
account.totalLevelPending.value.toString(),
account.totalBonusPending.value.toString(),
account.effectiveContribution.value.toString(),
account.effectiveContribution.value.toString(),
account.hasAdopted,
account.directReferralAdoptedCount,
account.unlockedLevelDepth,
account.unlockedBonusTiers,
account.createdAt,
);
return {
aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE,
aggregateId: account.accountSequence,
eventType: ContributionAccountUpdatedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
this.logger.log(`Incremental sync: published ${accounts.length} recently updated accounts`);
} catch (error) {
this.logger.error('Failed to publish recently updated accounts', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:incremental-sync`, lockValue);
}
}
/**
* 4
*
*/
@Cron('0 4 * * *')
async publishAllAccountUpdates(): Promise<void> {
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:full-sync`, 3600); // 1小时锁
if (!lockValue) {
return;
}
try {
this.logger.log('Starting daily full sync of contribution accounts...');
let page = 1;
const pageSize = 100;
let totalPublished = 0;
while (true) {
const { items: accounts, total } = await this.contributionAccountRepository.findMany({
page,
limit: pageSize,
orderBy: 'effectiveContribution',
order: 'desc',
});
if (accounts.length === 0) {
break;
}
const events = accounts.map((account) => {
const event = new ContributionAccountUpdatedEvent(
account.accountSequence,
account.personalContribution.value.toString(),
account.totalLevelPending.value.toString(),
account.totalBonusPending.value.toString(),
account.effectiveContribution.value.toString(),
account.effectiveContribution.value.toString(),
account.hasAdopted,
account.directReferralAdoptedCount,
account.unlockedLevelDepth,
account.unlockedBonusTiers,
account.createdAt,
);
return {
aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE,
aggregateId: account.accountSequence,
eventType: ContributionAccountUpdatedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
totalPublished += accounts.length;
if (accounts.length < pageSize || page * pageSize >= total) {
break;
}
page++;
}
this.logger.log(`Daily full sync completed: published ${totalPublished} contribution account events`);
} catch (error) {
this.logger.error('Failed to publish all account updates', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:full-sync`, lockValue);
}
}
}

View File

@ -12,7 +12,7 @@ import { ContributionRecordAggregate } from '../../domain/aggregates/contributio
import { SyncedReferral } from '../../domain/repositories/synced-data.repository.interface';
import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service';
import { ContributionRateService } from './contribution-rate.service';
import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent } from '../../domain/events';
import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent, ContributionAccountUpdatedEvent } from '../../domain/events';
/**
*
@ -164,6 +164,8 @@ export class ContributionCalculationService {
): Promise<void> {
// 收集所有保存后的记录带ID用于发布事件
const savedRecords: ContributionRecordAggregate[] = [];
// 收集所有被更新的账户序列号(用于发布账户更新事件)
const updatedAccountSequences = new Set<string>();
// 1. 保存个人算力记录
const savedPersonalRecord = await this.contributionRecordRepository.save(result.personalRecord);
@ -178,6 +180,7 @@ export class ContributionCalculationService {
}
account.addPersonalContribution(result.personalRecord.amount);
await this.contributionAccountRepository.save(account);
updatedAccountSequences.add(result.personalRecord.accountSequence);
// 2. 保存团队层级算力记录
if (result.teamLevelRecords.length > 0) {
@ -193,6 +196,7 @@ export class ContributionCalculationService {
record.levelDepth, // 传递层级深度
null,
);
updatedAccountSequences.add(record.accountSequence);
}
}
@ -210,6 +214,7 @@ export class ContributionCalculationService {
null,
record.bonusTier, // 传递加成档位
);
updatedAccountSequences.add(record.accountSequence);
}
}
@ -250,6 +255,23 @@ export class ContributionCalculationService {
// 6. 发布算力记录同步事件(用于 mining-admin-service- 使用保存后带 ID 的记录
await this.publishContributionRecordEvents(savedRecords);
// 7. 发布所有被更新账户的事件(用于 CDC 同步到 mining-admin-service
await this.publishUpdatedAccountEvents(updatedAccountSequences);
}
/**
*
*/
private async publishUpdatedAccountEvents(accountSequences: Set<string>): Promise<void> {
if (accountSequences.size === 0) return;
for (const accountSequence of accountSequences) {
const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence);
if (account) {
await this.publishContributionAccountUpdatedEvent(account);
}
}
}
/**
@ -300,6 +322,9 @@ export class ContributionCalculationService {
if (!account.hasAdopted) {
account.markAsAdopted();
await this.contributionAccountRepository.save(account);
// 发布账户更新事件到 outbox用于 CDC 同步到 mining-admin-service
await this.publishContributionAccountUpdatedEvent(account);
}
}
@ -323,6 +348,10 @@ export class ContributionCalculationService {
account.incrementDirectReferralAdoptedCount();
}
await this.contributionAccountRepository.save(account);
// 发布账户更新事件到 outbox用于 CDC 同步到 mining-admin-service
await this.publishContributionAccountUpdatedEvent(account);
this.logger.debug(
`Updated referrer ${referrerAccountSequence} unlock status: level=${account.unlockedLevelDepth}, bonus=${account.unlockedBonusTiers}`,
);
@ -393,4 +422,38 @@ export class ContributionCalculationService {
},
};
}
/**
* CDC mining-admin-service
*/
private async publishContributionAccountUpdatedEvent(
account: ContributionAccountAggregate,
): Promise<void> {
const event = new ContributionAccountUpdatedEvent(
account.accountSequence,
account.personalContribution.value.toString(),
account.totalLevelPending.value.toString(),
account.totalBonusPending.value.toString(),
account.effectiveContribution.value.toString(),
account.effectiveContribution.value.toString(),
account.hasAdopted,
account.directReferralAdoptedCount,
account.unlockedLevelDepth,
account.unlockedBonusTiers,
account.createdAt,
);
await this.outboxRepository.save({
aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE,
aggregateId: account.accountSequence,
eventType: ContributionAccountUpdatedEvent.EVENT_TYPE,
payload: event.toPayload(),
});
this.logger.debug(
`Published ContributionAccountUpdatedEvent for ${account.accountSequence}: ` +
`directReferralAdoptedCount=${account.directReferralAdoptedCount}, ` +
`hasAdopted=${account.hasAdopted}`,
);
}
}

View File

@ -0,0 +1,40 @@
/**
*
* directReferralAdoptedCount, unlockedLevelDepth, unlockedBonusTiers
* mining-admin-service
*/
export class ContributionAccountUpdatedEvent {
static readonly EVENT_TYPE = 'ContributionAccountUpdated';
static readonly AGGREGATE_TYPE = 'ContributionAccount';
constructor(
public readonly accountSequence: string,
public readonly personalContribution: string,
public readonly teamLevelContribution: string,
public readonly teamBonusContribution: string,
public readonly totalContribution: string,
public readonly effectiveContribution: string,
public readonly hasAdopted: boolean,
public readonly directReferralAdoptedCount: number,
public readonly unlockedLevelDepth: number,
public readonly unlockedBonusTiers: number,
public readonly createdAt: Date,
) {}
toPayload(): Record<string, any> {
return {
eventType: ContributionAccountUpdatedEvent.EVENT_TYPE,
accountSequence: this.accountSequence,
personalContribution: this.personalContribution,
teamLevelContribution: this.teamLevelContribution,
teamBonusContribution: this.teamBonusContribution,
totalContribution: this.totalContribution,
effectiveContribution: this.effectiveContribution,
hasAdopted: this.hasAdopted,
directReferralAdoptedCount: this.directReferralAdoptedCount,
unlockedLevelDepth: this.unlockedLevelDepth,
unlockedBonusTiers: this.unlockedBonusTiers,
createdAt: this.createdAt.toISOString(),
};
}
}

View File

@ -1,6 +1,7 @@
export * from './contribution-calculated.event';
export * from './daily-snapshot-created.event';
export * from './contribution-account-synced.event';
export * from './contribution-account-updated.event';
export * from './referral-synced.event';
export * from './adoption-synced.event';
export * from './contribution-record-synced.event';

View File

@ -223,6 +223,16 @@ export class ContributionAccountRepository implements IContributionAccountReposi
});
}
async findRecentlyUpdated(since: Date, limit: number = 500): Promise<ContributionAccountAggregate[]> {
const records = await this.client.contributionAccount.findMany({
where: { updatedAt: { gte: since } },
orderBy: { updatedAt: 'desc' },
take: limit,
});
return records.map((r) => this.toDomain(r));
}
private toDomain(record: any): ContributionAccountAggregate {
return ContributionAccountAggregate.fromPersistence({
id: record.id,