diff --git a/.claude/settings.local.json b/.claude/settings.local.json index a6c815b3..202929fd 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -768,7 +768,8 @@ "Bash(python3 -c \" import sys content = sys.stdin.read\\(\\) old = '''''' done # 清空 processed_cdc_events 表(因为 migration 时可能已经消费了一些消息) # 这是事务性幂等消费的关键:重置 Kafka offset 后必须同时清空幂等记录 log_info \"\"Truncating processed_cdc_events tables to allow re-consumption...\"\" for db in \"\"rwa_contribution\"\" \"\"rwa_auth\"\"; do if run_psql \"\"$db\"\" \"\"TRUNCATE TABLE processed_cdc_events;\"\" 2>/dev/null; then log_success \"\"Truncated processed_cdc_events in $db\"\" else log_warn \"\"Could not truncate processed_cdc_events in $db \\(table may not exist yet\\)\"\" fi done log_step \"\"Step 9/18: Starting 2.0 services...\"\"'''''' new = '''''' done # 清空 processed_cdc_events 表(因为 migration 时可能已经消费了一些消息) # 这是事务性幂等消费的关键:重置 Kafka offset 后必须同时清空幂等记录 log_info \"\"Truncating processed_cdc_events tables to allow re-consumption...\"\" for db in \"\"rwa_contribution\"\" \"\"rwa_auth\"\"; do if run_psql \"\"$db\"\" \"\"TRUNCATE TABLE processed_cdc_events;\"\" 2>/dev/null; then log_success \"\"Truncated processed_cdc_events in $db\"\" else log_warn \"\"Could not truncate processed_cdc_events in $db \\(table may not exist yet\\)\"\" fi done log_step \"\"Step 9/18: Starting 2.0 services...\"\"'''''' print\\(content.replace\\(old, new\\)\\) \")", "Bash(git rm:*)", "Bash(echo \"请在服务器运行以下命令检查 outbox 事件:\n\ndocker exec -it rwa-postgres psql -U rwa_user -d rwa_contribution -c \"\"\nSELECT id, event_type, aggregate_id, \n payload->>''sourceType'' as source_type,\n payload->>''accountSequence'' as account_seq,\n payload->>''sourceAccountSequence'' as source_account_seq,\n payload->>''bonusTier'' as bonus_tier\nFROM outbox_events \nWHERE payload->>''accountSequence'' = ''D25122900007''\nORDER BY id;\n\"\"\")", - "Bash(ssh -o ConnectTimeout=10 ceshi@14.215.128.96 'find /home/ceshi/rwadurian/frontend/mining-admin-web -name \"\"*.tsx\"\" -o -name \"\"*.ts\"\" | xargs grep -l \"\"用户管理\\\\|users\"\" 2>/dev/null | head -10')" + "Bash(ssh -o ConnectTimeout=10 ceshi@14.215.128.96 'find /home/ceshi/rwadurian/frontend/mining-admin-web -name \"\"*.tsx\"\" -o -name \"\"*.ts\"\" | xargs grep -l \"\"用户管理\\\\|users\"\" 2>/dev/null | head -10')", + "Bash(dir /s /b \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\")" ], "deny": [], "ask": [] diff --git a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts index 0d880182..899b851a 100644 --- a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts +++ b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts @@ -3,9 +3,11 @@ import { Cron, CronExpression } from '@nestjs/schedule'; import { ContributionCalculationService } from '../services/contribution-calculation.service'; import { SnapshotService } from '../services/snapshot.service'; import { ContributionRecordRepository } from '../../infrastructure/persistence/repositories/contribution-record.repository'; +import { ContributionAccountRepository } from '../../infrastructure/persistence/repositories/contribution-account.repository'; import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; import { KafkaProducerService } from '../../infrastructure/kafka/kafka-producer.service'; import { RedisService } from '../../infrastructure/redis/redis.service'; +import { ContributionAccountUpdatedEvent } from '../../domain/events'; /** * 算力相关定时任务 @@ -19,6 +21,7 @@ export class ContributionScheduler implements OnModuleInit { private readonly calculationService: ContributionCalculationService, private readonly snapshotService: SnapshotService, private readonly contributionRecordRepository: ContributionRecordRepository, + private readonly contributionAccountRepository: ContributionAccountRepository, private readonly outboxRepository: OutboxRepository, private readonly kafkaProducer: KafkaProducerService, private readonly redis: RedisService, @@ -174,4 +177,128 @@ export class ContributionScheduler implements OnModuleInit { await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue); } } + + /** + * 每10分钟增量发布最近更新的贡献值账户事件 + * 只同步过去15分钟内有变更的账户,作为实时同步的补充 + */ + @Cron('*/10 * * * *') + async publishRecentlyUpdatedAccounts(): Promise { + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:incremental-sync`, 540); // 9分钟锁 + if (!lockValue) { + return; + } + + try { + // 查找过去15分钟内更新的账户(比10分钟多5分钟余量,避免遗漏边界情况) + const fifteenMinutesAgo = new Date(Date.now() - 15 * 60 * 1000); + + const accounts = await this.contributionAccountRepository.findRecentlyUpdated(fifteenMinutesAgo, 500); + + if (accounts.length === 0) { + return; + } + + const events = accounts.map((account) => { + const event = new ContributionAccountUpdatedEvent( + account.accountSequence, + account.personalContribution.value.toString(), + account.totalLevelPending.value.toString(), + account.totalBonusPending.value.toString(), + account.effectiveContribution.value.toString(), + account.effectiveContribution.value.toString(), + account.hasAdopted, + account.directReferralAdoptedCount, + account.unlockedLevelDepth, + account.unlockedBonusTiers, + account.createdAt, + ); + + return { + aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE, + aggregateId: account.accountSequence, + eventType: ContributionAccountUpdatedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + await this.outboxRepository.saveMany(events); + + this.logger.log(`Incremental sync: published ${accounts.length} recently updated accounts`); + } catch (error) { + this.logger.error('Failed to publish recently updated accounts', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:incremental-sync`, lockValue); + } + } + + /** + * 每天凌晨4点全量发布所有贡献值账户更新事件 + * 作为数据一致性的最终兜底保障 + */ + @Cron('0 4 * * *') + async publishAllAccountUpdates(): Promise { + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:full-sync`, 3600); // 1小时锁 + if (!lockValue) { + return; + } + + try { + this.logger.log('Starting daily full sync of contribution accounts...'); + + let page = 1; + const pageSize = 100; + let totalPublished = 0; + + while (true) { + const { items: accounts, total } = await this.contributionAccountRepository.findMany({ + page, + limit: pageSize, + orderBy: 'effectiveContribution', + order: 'desc', + }); + + if (accounts.length === 0) { + break; + } + + const events = accounts.map((account) => { + const event = new ContributionAccountUpdatedEvent( + account.accountSequence, + account.personalContribution.value.toString(), + account.totalLevelPending.value.toString(), + account.totalBonusPending.value.toString(), + account.effectiveContribution.value.toString(), + account.effectiveContribution.value.toString(), + account.hasAdopted, + account.directReferralAdoptedCount, + account.unlockedLevelDepth, + account.unlockedBonusTiers, + account.createdAt, + ); + + return { + aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE, + aggregateId: account.accountSequence, + eventType: ContributionAccountUpdatedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + await this.outboxRepository.saveMany(events); + totalPublished += accounts.length; + + if (accounts.length < pageSize || page * pageSize >= total) { + break; + } + page++; + } + + this.logger.log(`Daily full sync completed: published ${totalPublished} contribution account events`); + } catch (error) { + this.logger.error('Failed to publish all account updates', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:full-sync`, lockValue); + } + } } diff --git a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts index fa2bd34a..dca61bc0 100644 --- a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts +++ b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts @@ -12,7 +12,7 @@ import { ContributionRecordAggregate } from '../../domain/aggregates/contributio import { SyncedReferral } from '../../domain/repositories/synced-data.repository.interface'; import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service'; import { ContributionRateService } from './contribution-rate.service'; -import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent } from '../../domain/events'; +import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent, ContributionAccountUpdatedEvent } from '../../domain/events'; /** * 算力计算应用服务 @@ -164,6 +164,8 @@ export class ContributionCalculationService { ): Promise { // 收集所有保存后的记录(带ID)用于发布事件 const savedRecords: ContributionRecordAggregate[] = []; + // 收集所有被更新的账户序列号(用于发布账户更新事件) + const updatedAccountSequences = new Set(); // 1. 保存个人算力记录 const savedPersonalRecord = await this.contributionRecordRepository.save(result.personalRecord); @@ -178,6 +180,7 @@ export class ContributionCalculationService { } account.addPersonalContribution(result.personalRecord.amount); await this.contributionAccountRepository.save(account); + updatedAccountSequences.add(result.personalRecord.accountSequence); // 2. 保存团队层级算力记录 if (result.teamLevelRecords.length > 0) { @@ -193,6 +196,7 @@ export class ContributionCalculationService { record.levelDepth, // 传递层级深度 null, ); + updatedAccountSequences.add(record.accountSequence); } } @@ -210,6 +214,7 @@ export class ContributionCalculationService { null, record.bonusTier, // 传递加成档位 ); + updatedAccountSequences.add(record.accountSequence); } } @@ -250,6 +255,23 @@ export class ContributionCalculationService { // 6. 发布算力记录同步事件(用于 mining-admin-service)- 使用保存后带 ID 的记录 await this.publishContributionRecordEvents(savedRecords); + + // 7. 发布所有被更新账户的事件(用于 CDC 同步到 mining-admin-service) + await this.publishUpdatedAccountEvents(updatedAccountSequences); + } + + /** + * 发布被更新账户的事件 + */ + private async publishUpdatedAccountEvents(accountSequences: Set): Promise { + if (accountSequences.size === 0) return; + + for (const accountSequence of accountSequences) { + const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence); + if (account) { + await this.publishContributionAccountUpdatedEvent(account); + } + } } /** @@ -300,6 +322,9 @@ export class ContributionCalculationService { if (!account.hasAdopted) { account.markAsAdopted(); await this.contributionAccountRepository.save(account); + + // 发布账户更新事件到 outbox(用于 CDC 同步到 mining-admin-service) + await this.publishContributionAccountUpdatedEvent(account); } } @@ -323,6 +348,10 @@ export class ContributionCalculationService { account.incrementDirectReferralAdoptedCount(); } await this.contributionAccountRepository.save(account); + + // 发布账户更新事件到 outbox(用于 CDC 同步到 mining-admin-service) + await this.publishContributionAccountUpdatedEvent(account); + this.logger.debug( `Updated referrer ${referrerAccountSequence} unlock status: level=${account.unlockedLevelDepth}, bonus=${account.unlockedBonusTiers}`, ); @@ -393,4 +422,38 @@ export class ContributionCalculationService { }, }; } + + /** + * 发布贡献值账户更新事件(用于 CDC 同步到 mining-admin-service) + */ + private async publishContributionAccountUpdatedEvent( + account: ContributionAccountAggregate, + ): Promise { + const event = new ContributionAccountUpdatedEvent( + account.accountSequence, + account.personalContribution.value.toString(), + account.totalLevelPending.value.toString(), + account.totalBonusPending.value.toString(), + account.effectiveContribution.value.toString(), + account.effectiveContribution.value.toString(), + account.hasAdopted, + account.directReferralAdoptedCount, + account.unlockedLevelDepth, + account.unlockedBonusTiers, + account.createdAt, + ); + + await this.outboxRepository.save({ + aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE, + aggregateId: account.accountSequence, + eventType: ContributionAccountUpdatedEvent.EVENT_TYPE, + payload: event.toPayload(), + }); + + this.logger.debug( + `Published ContributionAccountUpdatedEvent for ${account.accountSequence}: ` + + `directReferralAdoptedCount=${account.directReferralAdoptedCount}, ` + + `hasAdopted=${account.hasAdopted}`, + ); + } } diff --git a/backend/services/contribution-service/src/domain/events/contribution-account-updated.event.ts b/backend/services/contribution-service/src/domain/events/contribution-account-updated.event.ts new file mode 100644 index 00000000..543f281c --- /dev/null +++ b/backend/services/contribution-service/src/domain/events/contribution-account-updated.event.ts @@ -0,0 +1,40 @@ +/** + * 贡献值账户更新事件 + * 当账户的 directReferralAdoptedCount, unlockedLevelDepth, unlockedBonusTiers 等字段更新时发布 + * 用于实时同步到 mining-admin-service + */ +export class ContributionAccountUpdatedEvent { + static readonly EVENT_TYPE = 'ContributionAccountUpdated'; + static readonly AGGREGATE_TYPE = 'ContributionAccount'; + + constructor( + public readonly accountSequence: string, + public readonly personalContribution: string, + public readonly teamLevelContribution: string, + public readonly teamBonusContribution: string, + public readonly totalContribution: string, + public readonly effectiveContribution: string, + public readonly hasAdopted: boolean, + public readonly directReferralAdoptedCount: number, + public readonly unlockedLevelDepth: number, + public readonly unlockedBonusTiers: number, + public readonly createdAt: Date, + ) {} + + toPayload(): Record { + return { + eventType: ContributionAccountUpdatedEvent.EVENT_TYPE, + accountSequence: this.accountSequence, + personalContribution: this.personalContribution, + teamLevelContribution: this.teamLevelContribution, + teamBonusContribution: this.teamBonusContribution, + totalContribution: this.totalContribution, + effectiveContribution: this.effectiveContribution, + hasAdopted: this.hasAdopted, + directReferralAdoptedCount: this.directReferralAdoptedCount, + unlockedLevelDepth: this.unlockedLevelDepth, + unlockedBonusTiers: this.unlockedBonusTiers, + createdAt: this.createdAt.toISOString(), + }; + } +} diff --git a/backend/services/contribution-service/src/domain/events/index.ts b/backend/services/contribution-service/src/domain/events/index.ts index f5d1f555..debea840 100644 --- a/backend/services/contribution-service/src/domain/events/index.ts +++ b/backend/services/contribution-service/src/domain/events/index.ts @@ -1,6 +1,7 @@ export * from './contribution-calculated.event'; export * from './daily-snapshot-created.event'; export * from './contribution-account-synced.event'; +export * from './contribution-account-updated.event'; export * from './referral-synced.event'; export * from './adoption-synced.event'; export * from './contribution-record-synced.event'; diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts index 1b66c02b..2fb79b86 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts @@ -223,6 +223,16 @@ export class ContributionAccountRepository implements IContributionAccountReposi }); } + async findRecentlyUpdated(since: Date, limit: number = 500): Promise { + const records = await this.client.contributionAccount.findMany({ + where: { updatedAt: { gte: since } }, + orderBy: { updatedAt: 'desc' }, + take: limit, + }); + + return records.map((r) => this.toDomain(r)); + } + private toDomain(record: any): ContributionAccountAggregate { return ContributionAccountAggregate.fromPersistence({ id: record.id,