diff --git a/backend/services/contribution-service/src/api/controllers/admin.controller.ts b/backend/services/contribution-service/src/api/controllers/admin.controller.ts index 6bd0a042..9a459431 100644 --- a/backend/services/contribution-service/src/api/controllers/admin.controller.ts +++ b/backend/services/contribution-service/src/api/controllers/admin.controller.ts @@ -1,12 +1,21 @@ -import { Controller, Get } from '@nestjs/common'; +import { Controller, Get, Post, Logger } from '@nestjs/common'; import { ApiTags, ApiOperation } from '@nestjs/swagger'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; +import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; +import { ContributionAccountSyncedEvent } from '../../domain/events'; import { Public } from '../../shared/guards/jwt-auth.guard'; @ApiTags('Admin') @Controller('admin') export class AdminController { - constructor(private readonly prisma: PrismaService) {} + private readonly logger = new Logger(AdminController.name); + + constructor( + private readonly prisma: PrismaService, + private readonly outboxRepository: OutboxRepository, + private readonly unitOfWork: UnitOfWork, + ) {} @Get('accounts/sync') @Public() @@ -45,4 +54,80 @@ export class AdminController { total: accounts.length, }; } + + @Post('contribution-accounts/publish-all') + @Public() + @ApiOperation({ summary: '发布所有贡献值账户事件到 outbox,用于初始同步到 mining-admin-service' }) + async publishAllContributionAccounts(): Promise<{ + success: boolean; + publishedCount: number; + failedCount: number; + message: string; + }> { + const accounts = await this.prisma.contributionAccount.findMany({ + select: { + accountSequence: true, + personalContribution: true, + totalLevelPending: true, + totalBonusPending: true, + effectiveContribution: true, + hasAdopted: true, + directReferralAdoptedCount: true, + unlockedLevelDepth: true, + createdAt: true, + }, + }); + + let publishedCount = 0; + let failedCount = 0; + + // 批量处理,每批 100 条 + const batchSize = 100; + for (let i = 0; i < accounts.length; i += batchSize) { + const batch = accounts.slice(i, i + batchSize); + + try { + await this.unitOfWork.executeInTransaction(async () => { + const events = batch.map((acc) => { + const event = new ContributionAccountSyncedEvent( + acc.accountSequence, + acc.personalContribution.toString(), + acc.totalLevelPending.toString(), + acc.totalBonusPending.toString(), + acc.effectiveContribution.toString(), + acc.effectiveContribution.toString(), + acc.hasAdopted, + acc.directReferralAdoptedCount, + acc.unlockedLevelDepth, + acc.createdAt, + ); + + return { + aggregateType: ContributionAccountSyncedEvent.AGGREGATE_TYPE, + aggregateId: acc.accountSequence, + eventType: ContributionAccountSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + await this.outboxRepository.saveMany(events); + }); + + publishedCount += batch.length; + this.logger.debug(`Published batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`); + } catch (error) { + failedCount += batch.length; + this.logger.error(`Failed to publish batch ${Math.floor(i / batchSize) + 1}`, error); + } + } + + this.logger.log(`Published ${publishedCount} contribution account events, ${failedCount} failed`); + + return { + success: failedCount === 0, + publishedCount, + failedCount, + message: `Published ${publishedCount} events, ${failedCount} failed out of ${accounts.length} total`, + }; + } } diff --git a/backend/services/contribution-service/src/domain/events/contribution-account-synced.event.ts b/backend/services/contribution-service/src/domain/events/contribution-account-synced.event.ts new file mode 100644 index 00000000..4d1c4064 --- /dev/null +++ b/backend/services/contribution-service/src/domain/events/contribution-account-synced.event.ts @@ -0,0 +1,37 @@ +/** + * 贡献值账户同步事件 + * 用于初始同步到 mining-admin-service + */ +export class ContributionAccountSyncedEvent { + static readonly EVENT_TYPE = 'ContributionAccountSynced'; + static readonly AGGREGATE_TYPE = 'ContributionAccount'; + + constructor( + public readonly accountSequence: string, + public readonly personalContribution: string, + public readonly teamLevelContribution: string, + public readonly teamBonusContribution: string, + public readonly totalContribution: string, + public readonly effectiveContribution: string, + public readonly hasAdopted: boolean, + public readonly directReferralAdoptedCount: number, + public readonly unlockedLevelDepth: number, + public readonly createdAt: Date, + ) {} + + toPayload(): Record { + return { + eventType: ContributionAccountSyncedEvent.EVENT_TYPE, + accountSequence: this.accountSequence, + personalContribution: this.personalContribution, + teamLevelContribution: this.teamLevelContribution, + teamBonusContribution: this.teamBonusContribution, + totalContribution: this.totalContribution, + effectiveContribution: this.effectiveContribution, + hasAdopted: this.hasAdopted, + directReferralAdoptedCount: this.directReferralAdoptedCount, + unlockedLevelDepth: this.unlockedLevelDepth, + createdAt: this.createdAt.toISOString(), + }; + } +} diff --git a/backend/services/contribution-service/src/domain/events/index.ts b/backend/services/contribution-service/src/domain/events/index.ts index 6ad32f90..255b50d3 100644 --- a/backend/services/contribution-service/src/domain/events/index.ts +++ b/backend/services/contribution-service/src/domain/events/index.ts @@ -1,2 +1,3 @@ export * from './contribution-calculated.event'; export * from './daily-snapshot-created.event'; +export * from './contribution-account-synced.event'; diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 20761cfc..ae94f13f 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -109,6 +109,19 @@ CDC_CONSUMER_GROUPS=( "mining-admin-service-cdc-group" ) +# Debezium Outbox Connectors (for 2.0 service events -> mining-admin-service) +# These connectors capture events from each service's outbox table +OUTBOX_CONNECTORS=( + "auth-outbox-connector" + "contribution-outbox-connector" + "mining-outbox-connector" + "trading-outbox-connector" + "mining-wallet-outbox-connector" +) + +# Debezium Connect URL +DEBEZIUM_CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8083}" + # Colors RED='\033[0;31m' GREEN='\033[0;32m' @@ -720,6 +733,127 @@ sync_status() { done } +# =========================================================================== +# Debezium Outbox Connector Functions +# =========================================================================== + +# Register all outbox connectors with Debezium Connect +register_outbox_connectors() { + print_section "Registering Outbox Connectors" + + local scripts_dir="$SCRIPT_DIR/scripts/debezium" + local connect_url="$DEBEZIUM_CONNECT_URL" + + # Check if Debezium Connect is available + if ! curl -s "$connect_url" &>/dev/null; then + log_warn "Debezium Connect not available at $connect_url" + log_info "Outbox connectors will not be registered. You may need to register them manually." + return 1 + fi + + for connector in "${OUTBOX_CONNECTORS[@]}"; do + local config_file="$scripts_dir/${connector}.json" + + if [ ! -f "$config_file" ]; then + log_warn "Config file not found: $config_file" + continue + fi + + log_info "Registering connector: $connector" + + # Check if connector already exists + local existing + existing=$(curl -s "$connect_url/connectors/$connector" 2>/dev/null) + + if echo "$existing" | grep -q '"name"'; then + # Connector exists, update it + log_info "Updating existing connector: $connector" + + # Extract just the config part for PUT request + local config_only + config_only=$(cat "$config_file" | sed 's/.*"config"://' | sed 's/}$//') + + # Use envsubst to replace environment variables, then update + local result + result=$(cat "$config_file" | envsubst | curl -s -X PUT \ + -H "Content-Type: application/json" \ + -d @- \ + "$connect_url/connectors/$connector/config" 2>/dev/null) + + if echo "$result" | grep -q '"name"'; then + log_success "Updated connector: $connector" + else + log_warn "Failed to update connector $connector: $result" + fi + else + # Connector doesn't exist, create it + # Replace environment variables in the config file + local result + result=$(cat "$config_file" | envsubst | curl -s -X POST \ + -H "Content-Type: application/json" \ + -d @- \ + "$connect_url/connectors" 2>/dev/null) + + if echo "$result" | grep -q '"name"'; then + log_success "Registered connector: $connector" + else + log_warn "Failed to register connector $connector: $result" + fi + fi + done + + log_success "Outbox connector registration completed" +} + +# Delete all outbox connectors +delete_outbox_connectors() { + print_section "Deleting Outbox Connectors" + + local connect_url="$DEBEZIUM_CONNECT_URL" + + if ! curl -s "$connect_url" &>/dev/null; then + log_warn "Debezium Connect not available at $connect_url" + return 1 + fi + + for connector in "${OUTBOX_CONNECTORS[@]}"; do + log_info "Deleting connector: $connector" + curl -s -X DELETE "$connect_url/connectors/$connector" &>/dev/null + log_success "Deleted connector: $connector" + done +} + +# Show outbox connector status +outbox_status() { + print_section "Outbox Connector Status" + + local connect_url="$DEBEZIUM_CONNECT_URL" + + if ! curl -s "$connect_url" &>/dev/null; then + log_warn "Debezium Connect not available at $connect_url" + return 1 + fi + + for connector in "${OUTBOX_CONNECTORS[@]}"; do + echo -e "${BOLD}Connector:${NC} $connector" + local status + status=$(curl -s "$connect_url/connectors/$connector/status" 2>/dev/null) + + if echo "$status" | grep -q '"state"'; then + local state + state=$(echo "$status" | grep -o '"state":"[^"]*"' | head -1 | cut -d'"' -f4) + if [ "$state" = "RUNNING" ]; then + echo -e " Status: ${GREEN}$state${NC}" + else + echo -e " Status: ${RED}$state${NC}" + fi + else + echo -e " Status: ${RED}NOT REGISTERED${NC}" + fi + echo "" + done +} + # =========================================================================== # Full Reset Function # =========================================================================== @@ -747,16 +881,16 @@ full_reset() { fi echo "" - log_step "Step 1/10: Stopping 2.0 services..." + log_step "Step 1/12: Stopping 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_stop "$service" done - log_step "Step 2/10: Waiting for Kafka consumers to become inactive..." + log_step "Step 2/12: Waiting for Kafka consumers to become inactive..." log_info "Waiting 15 seconds for consumer group session timeout..." sleep 15 - log_step "Step 3/10: Resetting CDC consumer offsets..." + log_step "Step 3/12: Resetting CDC consumer offsets..." # Reset offsets BEFORE migrations (which may start containers) for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" @@ -793,17 +927,17 @@ full_reset() { fi done - log_step "Step 4/10: Dropping 2.0 databases..." + log_step "Step 4/12: Dropping 2.0 databases..." db_drop - log_step "Step 5/10: Creating 2.0 databases..." + log_step "Step 5/12: Creating 2.0 databases..." db_create - log_step "Step 6/10: Running migrations..." + log_step "Step 6/12: Running migrations..." db_migrate # Stop any containers that were started during migration - log_step "Step 7/10: Stopping containers and resetting CDC offsets again..." + log_step "Step 7/12: Stopping containers and resetting CDC offsets again..." log_info "Migration may have started CDC consumers, stopping them now..." for service in "${MINING_SERVICES[@]}"; do docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true @@ -847,16 +981,22 @@ full_reset() { fi done - log_step "Step 8/10: Starting 2.0 services..." + log_step "Step 8/12: Registering Debezium outbox connectors..." + # Register outbox connectors for 2.0 service events + # These connectors capture events from each service's outbox table and send to Kafka + # mining-admin-service consumes these events to aggregate data from all 2.0 services + register_outbox_connectors || log_warn "Some connectors may not be registered" + + log_step "Step 9/12: Starting 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_start "$service" done - log_step "Step 9/10: Waiting for services to be ready..." + log_step "Step 10/12: Waiting for services to be ready..." log_info "Waiting 20 seconds for all services to start and sync from 1.0 CDC..." sleep 20 - log_step "Step 10/10: Publishing legacy users to mining-admin-service..." + log_step "Step 11/12: Publishing legacy users to mining-admin-service..." # 调用 auth-service API 发布所有旧用户事件到 outbox # 这样 mining-admin-service 才能通过 Debezium 收到用户数据 local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all" @@ -872,9 +1012,24 @@ full_reset() { log_info "You may need to manually call: curl -X POST $publish_url" fi + log_step "Step 12/12: Publishing contribution data to mining-admin-service..." + # 调用 contribution-service API 发布所有算力账户事件到 outbox + local contrib_publish_url="http://localhost:3020/api/v2/admin/contribution-accounts/publish-all" + local contrib_result + contrib_result=$(curl -s -X POST "$contrib_publish_url" 2>/dev/null || echo '{"error": "curl failed"}') + + if echo "$contrib_result" | grep -q '"success":true'; then + local contrib_count + contrib_count=$(echo "$contrib_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*') + log_success "Published $contrib_count contribution account events to outbox" + else + log_warn "Failed to publish contribution data: $contrib_result" + log_info "You may need to manually call: curl -X POST $contrib_publish_url" + fi + # 等待 mining-admin-service 消费 outbox 事件 - log_info "Waiting 10 seconds for mining-admin-service to sync..." - sleep 10 + log_info "Waiting 15 seconds for mining-admin-service to sync all data..." + sleep 15 echo "" echo -e "${GREEN}${BOLD}╔════════════════════════════════════════════════════════════╗${NC}" @@ -1003,6 +1158,9 @@ show_help() { echo -e "${BOLD}CDC / Sync Management:${NC}" echo " sync-reset Reset CDC consumer to read from beginning" echo " sync-status Show CDC consumer group status" + echo " outbox-register Register all Debezium outbox connectors" + echo " outbox-status Show outbox connector status" + echo " outbox-delete Delete all outbox connectors" echo "" echo -e "${BOLD}Full Reset:${NC}" echo " full-reset Complete system reset ${RED}(DANGEROUS!)${NC}" @@ -1103,6 +1261,20 @@ main() { sync_status ;; + # Outbox connector commands + outbox-register) + print_header + register_outbox_connectors + ;; + outbox-status) + print_header + outbox_status + ;; + outbox-delete) + print_header + delete_outbox_connectors + ;; + # Full reset full-reset) print_header diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index 94cecfc1..916f2778 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -77,6 +77,16 @@ export class CdcSyncService implements OnModuleInit { 'ContributionAccountUpdated', this.handleContributionAccountUpdated.bind(this), ); + // ContributionAccountSynced 用于初始全量同步 + this.cdcConsumer.registerServiceHandler( + 'ContributionAccountSynced', + this.handleContributionAccountUpdated.bind(this), + ); + // ContributionCalculated 事件在算力计算完成时发布 + this.cdcConsumer.registerServiceHandler( + 'ContributionCalculated', + this.handleContributionCalculated.bind(this), + ); this.cdcConsumer.registerServiceHandler( 'SystemContributionUpdated', this.handleSystemContributionUpdated.bind(this), @@ -450,6 +460,52 @@ export class CdcSyncService implements OnModuleInit { } } + /** + * 处理 ContributionCalculated 事件 + * contribution-service 在计算算力时发布,触发增量更新 + */ + private async handleContributionCalculated( + event: ServiceEvent, + ): Promise { + const { payload } = event; + + try { + // ContributionCalculated 事件只包含部分信息,需要获取完整数据 + // 这里只更新已存在的记录,或创建基本记录等待后续同步 + await this.prisma.syncedContributionAccount.upsert({ + where: { accountSequence: payload.accountSequence }, + create: { + accountSequence: payload.accountSequence, + personalContribution: payload.personalContribution || 0, + teamLevelContribution: 0, + teamBonusContribution: 0, + totalContribution: 0, + effectiveContribution: 0, + hasAdopted: true, // 有算力计算说明已认种 + directReferralCount: 0, + unlockedLevelDepth: 0, + }, + update: { + // 增量更新个人算力 + personalContribution: { + increment: parseFloat(payload.personalContribution) || 0, + }, + hasAdopted: true, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug( + `Processed contribution calculation: ${payload.accountSequence}`, + ); + } catch (error) { + this.logger.error( + `Failed to process contribution calculation: ${payload.accountSequence}`, + error, + ); + } + } + private async handleSystemContributionUpdated( event: ServiceEvent, ): Promise {