feat(cdc): 完善 2.0 服务数据聚合到 mining-admin-service

1. deploy-mining.sh:
   - 添加 outbox connectors 配置数组 (auth, contribution, mining, trading, wallet)
   - 添加 register_outbox_connectors() 函数自动注册 Debezium 连接器
   - 添加 outbox-register, outbox-status, outbox-delete 命令
   - full-reset 更新为 12 步,包含注册 outbox connectors 和初始数据发布

2. contribution-service:
   - 添加 ContributionAccountSyncedEvent 事件
   - 添加 POST /admin/contribution-accounts/publish-all API 用于初始全量同步

3. mining-admin-service:
   - 添加 ContributionAccountSynced 事件处理(复用 ContributionAccountUpdated 处理器)
   - 添加 ContributionCalculated 事件处理

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 01:41:46 -08:00
parent 489966fae9
commit 40745ca580
5 changed files with 365 additions and 14 deletions

View File

@ -1,12 +1,21 @@
import { Controller, Get } from '@nestjs/common'; import { Controller, Get, Post, Logger } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger'; import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
import { ContributionAccountSyncedEvent } from '../../domain/events';
import { Public } from '../../shared/guards/jwt-auth.guard'; import { Public } from '../../shared/guards/jwt-auth.guard';
@ApiTags('Admin') @ApiTags('Admin')
@Controller('admin') @Controller('admin')
export class AdminController { export class AdminController {
constructor(private readonly prisma: PrismaService) {} private readonly logger = new Logger(AdminController.name);
constructor(
private readonly prisma: PrismaService,
private readonly outboxRepository: OutboxRepository,
private readonly unitOfWork: UnitOfWork,
) {}
@Get('accounts/sync') @Get('accounts/sync')
@Public() @Public()
@ -45,4 +54,80 @@ export class AdminController {
total: accounts.length, total: accounts.length,
}; };
} }
@Post('contribution-accounts/publish-all')
@Public()
@ApiOperation({ summary: '发布所有贡献值账户事件到 outbox用于初始同步到 mining-admin-service' })
async publishAllContributionAccounts(): Promise<{
success: boolean;
publishedCount: number;
failedCount: number;
message: string;
}> {
const accounts = await this.prisma.contributionAccount.findMany({
select: {
accountSequence: true,
personalContribution: true,
totalLevelPending: true,
totalBonusPending: true,
effectiveContribution: true,
hasAdopted: true,
directReferralAdoptedCount: true,
unlockedLevelDepth: true,
createdAt: true,
},
});
let publishedCount = 0;
let failedCount = 0;
// 批量处理,每批 100 条
const batchSize = 100;
for (let i = 0; i < accounts.length; i += batchSize) {
const batch = accounts.slice(i, i + batchSize);
try {
await this.unitOfWork.executeInTransaction(async () => {
const events = batch.map((acc) => {
const event = new ContributionAccountSyncedEvent(
acc.accountSequence,
acc.personalContribution.toString(),
acc.totalLevelPending.toString(),
acc.totalBonusPending.toString(),
acc.effectiveContribution.toString(),
acc.effectiveContribution.toString(),
acc.hasAdopted,
acc.directReferralAdoptedCount,
acc.unlockedLevelDepth,
acc.createdAt,
);
return {
aggregateType: ContributionAccountSyncedEvent.AGGREGATE_TYPE,
aggregateId: acc.accountSequence,
eventType: ContributionAccountSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
});
publishedCount += batch.length;
this.logger.debug(`Published batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`);
} catch (error) {
failedCount += batch.length;
this.logger.error(`Failed to publish batch ${Math.floor(i / batchSize) + 1}`, error);
}
}
this.logger.log(`Published ${publishedCount} contribution account events, ${failedCount} failed`);
return {
success: failedCount === 0,
publishedCount,
failedCount,
message: `Published ${publishedCount} events, ${failedCount} failed out of ${accounts.length} total`,
};
}
} }

View File

@ -0,0 +1,37 @@
/**
*
* mining-admin-service
*/
export class ContributionAccountSyncedEvent {
static readonly EVENT_TYPE = 'ContributionAccountSynced';
static readonly AGGREGATE_TYPE = 'ContributionAccount';
constructor(
public readonly accountSequence: string,
public readonly personalContribution: string,
public readonly teamLevelContribution: string,
public readonly teamBonusContribution: string,
public readonly totalContribution: string,
public readonly effectiveContribution: string,
public readonly hasAdopted: boolean,
public readonly directReferralAdoptedCount: number,
public readonly unlockedLevelDepth: number,
public readonly createdAt: Date,
) {}
toPayload(): Record<string, any> {
return {
eventType: ContributionAccountSyncedEvent.EVENT_TYPE,
accountSequence: this.accountSequence,
personalContribution: this.personalContribution,
teamLevelContribution: this.teamLevelContribution,
teamBonusContribution: this.teamBonusContribution,
totalContribution: this.totalContribution,
effectiveContribution: this.effectiveContribution,
hasAdopted: this.hasAdopted,
directReferralAdoptedCount: this.directReferralAdoptedCount,
unlockedLevelDepth: this.unlockedLevelDepth,
createdAt: this.createdAt.toISOString(),
};
}
}

View File

@ -1,2 +1,3 @@
export * from './contribution-calculated.event'; export * from './contribution-calculated.event';
export * from './daily-snapshot-created.event'; export * from './daily-snapshot-created.event';
export * from './contribution-account-synced.event';

View File

@ -109,6 +109,19 @@ CDC_CONSUMER_GROUPS=(
"mining-admin-service-cdc-group" "mining-admin-service-cdc-group"
) )
# Debezium Outbox Connectors (for 2.0 service events -> mining-admin-service)
# These connectors capture events from each service's outbox table
OUTBOX_CONNECTORS=(
"auth-outbox-connector"
"contribution-outbox-connector"
"mining-outbox-connector"
"trading-outbox-connector"
"mining-wallet-outbox-connector"
)
# Debezium Connect URL
DEBEZIUM_CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8083}"
# Colors # Colors
RED='\033[0;31m' RED='\033[0;31m'
GREEN='\033[0;32m' GREEN='\033[0;32m'
@ -720,6 +733,127 @@ sync_status() {
done done
} }
# ===========================================================================
# Debezium Outbox Connector Functions
# ===========================================================================
# Register all outbox connectors with Debezium Connect
register_outbox_connectors() {
print_section "Registering Outbox Connectors"
local scripts_dir="$SCRIPT_DIR/scripts/debezium"
local connect_url="$DEBEZIUM_CONNECT_URL"
# Check if Debezium Connect is available
if ! curl -s "$connect_url" &>/dev/null; then
log_warn "Debezium Connect not available at $connect_url"
log_info "Outbox connectors will not be registered. You may need to register them manually."
return 1
fi
for connector in "${OUTBOX_CONNECTORS[@]}"; do
local config_file="$scripts_dir/${connector}.json"
if [ ! -f "$config_file" ]; then
log_warn "Config file not found: $config_file"
continue
fi
log_info "Registering connector: $connector"
# Check if connector already exists
local existing
existing=$(curl -s "$connect_url/connectors/$connector" 2>/dev/null)
if echo "$existing" | grep -q '"name"'; then
# Connector exists, update it
log_info "Updating existing connector: $connector"
# Extract just the config part for PUT request
local config_only
config_only=$(cat "$config_file" | sed 's/.*"config"://' | sed 's/}$//')
# Use envsubst to replace environment variables, then update
local result
result=$(cat "$config_file" | envsubst | curl -s -X PUT \
-H "Content-Type: application/json" \
-d @- \
"$connect_url/connectors/$connector/config" 2>/dev/null)
if echo "$result" | grep -q '"name"'; then
log_success "Updated connector: $connector"
else
log_warn "Failed to update connector $connector: $result"
fi
else
# Connector doesn't exist, create it
# Replace environment variables in the config file
local result
result=$(cat "$config_file" | envsubst | curl -s -X POST \
-H "Content-Type: application/json" \
-d @- \
"$connect_url/connectors" 2>/dev/null)
if echo "$result" | grep -q '"name"'; then
log_success "Registered connector: $connector"
else
log_warn "Failed to register connector $connector: $result"
fi
fi
done
log_success "Outbox connector registration completed"
}
# Delete all outbox connectors
delete_outbox_connectors() {
print_section "Deleting Outbox Connectors"
local connect_url="$DEBEZIUM_CONNECT_URL"
if ! curl -s "$connect_url" &>/dev/null; then
log_warn "Debezium Connect not available at $connect_url"
return 1
fi
for connector in "${OUTBOX_CONNECTORS[@]}"; do
log_info "Deleting connector: $connector"
curl -s -X DELETE "$connect_url/connectors/$connector" &>/dev/null
log_success "Deleted connector: $connector"
done
}
# Show outbox connector status
outbox_status() {
print_section "Outbox Connector Status"
local connect_url="$DEBEZIUM_CONNECT_URL"
if ! curl -s "$connect_url" &>/dev/null; then
log_warn "Debezium Connect not available at $connect_url"
return 1
fi
for connector in "${OUTBOX_CONNECTORS[@]}"; do
echo -e "${BOLD}Connector:${NC} $connector"
local status
status=$(curl -s "$connect_url/connectors/$connector/status" 2>/dev/null)
if echo "$status" | grep -q '"state"'; then
local state
state=$(echo "$status" | grep -o '"state":"[^"]*"' | head -1 | cut -d'"' -f4)
if [ "$state" = "RUNNING" ]; then
echo -e " Status: ${GREEN}$state${NC}"
else
echo -e " Status: ${RED}$state${NC}"
fi
else
echo -e " Status: ${RED}NOT REGISTERED${NC}"
fi
echo ""
done
}
# =========================================================================== # ===========================================================================
# Full Reset Function # Full Reset Function
# =========================================================================== # ===========================================================================
@ -747,16 +881,16 @@ full_reset() {
fi fi
echo "" echo ""
log_step "Step 1/10: Stopping 2.0 services..." log_step "Step 1/12: Stopping 2.0 services..."
for service in "${MINING_SERVICES[@]}"; do for service in "${MINING_SERVICES[@]}"; do
service_stop "$service" service_stop "$service"
done done
log_step "Step 2/10: Waiting for Kafka consumers to become inactive..." log_step "Step 2/12: Waiting for Kafka consumers to become inactive..."
log_info "Waiting 15 seconds for consumer group session timeout..." log_info "Waiting 15 seconds for consumer group session timeout..."
sleep 15 sleep 15
log_step "Step 3/10: Resetting CDC consumer offsets..." log_step "Step 3/12: Resetting CDC consumer offsets..."
# Reset offsets BEFORE migrations (which may start containers) # Reset offsets BEFORE migrations (which may start containers)
for group in "${CDC_CONSUMER_GROUPS[@]}"; do for group in "${CDC_CONSUMER_GROUPS[@]}"; do
log_info "Resetting consumer group: $group" log_info "Resetting consumer group: $group"
@ -793,17 +927,17 @@ full_reset() {
fi fi
done done
log_step "Step 4/10: Dropping 2.0 databases..." log_step "Step 4/12: Dropping 2.0 databases..."
db_drop db_drop
log_step "Step 5/10: Creating 2.0 databases..." log_step "Step 5/12: Creating 2.0 databases..."
db_create db_create
log_step "Step 6/10: Running migrations..." log_step "Step 6/12: Running migrations..."
db_migrate db_migrate
# Stop any containers that were started during migration # Stop any containers that were started during migration
log_step "Step 7/10: Stopping containers and resetting CDC offsets again..." log_step "Step 7/12: Stopping containers and resetting CDC offsets again..."
log_info "Migration may have started CDC consumers, stopping them now..." log_info "Migration may have started CDC consumers, stopping them now..."
for service in "${MINING_SERVICES[@]}"; do for service in "${MINING_SERVICES[@]}"; do
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true
@ -847,16 +981,22 @@ full_reset() {
fi fi
done done
log_step "Step 8/10: Starting 2.0 services..." log_step "Step 8/12: Registering Debezium outbox connectors..."
# Register outbox connectors for 2.0 service events
# These connectors capture events from each service's outbox table and send to Kafka
# mining-admin-service consumes these events to aggregate data from all 2.0 services
register_outbox_connectors || log_warn "Some connectors may not be registered"
log_step "Step 9/12: Starting 2.0 services..."
for service in "${MINING_SERVICES[@]}"; do for service in "${MINING_SERVICES[@]}"; do
service_start "$service" service_start "$service"
done done
log_step "Step 9/10: Waiting for services to be ready..." log_step "Step 10/12: Waiting for services to be ready..."
log_info "Waiting 20 seconds for all services to start and sync from 1.0 CDC..." log_info "Waiting 20 seconds for all services to start and sync from 1.0 CDC..."
sleep 20 sleep 20
log_step "Step 10/10: Publishing legacy users to mining-admin-service..." log_step "Step 11/12: Publishing legacy users to mining-admin-service..."
# 调用 auth-service API 发布所有旧用户事件到 outbox # 调用 auth-service API 发布所有旧用户事件到 outbox
# 这样 mining-admin-service 才能通过 Debezium 收到用户数据 # 这样 mining-admin-service 才能通过 Debezium 收到用户数据
local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all" local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all"
@ -872,9 +1012,24 @@ full_reset() {
log_info "You may need to manually call: curl -X POST $publish_url" log_info "You may need to manually call: curl -X POST $publish_url"
fi fi
log_step "Step 12/12: Publishing contribution data to mining-admin-service..."
# 调用 contribution-service API 发布所有算力账户事件到 outbox
local contrib_publish_url="http://localhost:3020/api/v2/admin/contribution-accounts/publish-all"
local contrib_result
contrib_result=$(curl -s -X POST "$contrib_publish_url" 2>/dev/null || echo '{"error": "curl failed"}')
if echo "$contrib_result" | grep -q '"success":true'; then
local contrib_count
contrib_count=$(echo "$contrib_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*')
log_success "Published $contrib_count contribution account events to outbox"
else
log_warn "Failed to publish contribution data: $contrib_result"
log_info "You may need to manually call: curl -X POST $contrib_publish_url"
fi
# 等待 mining-admin-service 消费 outbox 事件 # 等待 mining-admin-service 消费 outbox 事件
log_info "Waiting 10 seconds for mining-admin-service to sync..." log_info "Waiting 15 seconds for mining-admin-service to sync all data..."
sleep 10 sleep 15
echo "" echo ""
echo -e "${GREEN}${BOLD}╔════════════════════════════════════════════════════════════╗${NC}" echo -e "${GREEN}${BOLD}╔════════════════════════════════════════════════════════════╗${NC}"
@ -1003,6 +1158,9 @@ show_help() {
echo -e "${BOLD}CDC / Sync Management:${NC}" echo -e "${BOLD}CDC / Sync Management:${NC}"
echo " sync-reset Reset CDC consumer to read from beginning" echo " sync-reset Reset CDC consumer to read from beginning"
echo " sync-status Show CDC consumer group status" echo " sync-status Show CDC consumer group status"
echo " outbox-register Register all Debezium outbox connectors"
echo " outbox-status Show outbox connector status"
echo " outbox-delete Delete all outbox connectors"
echo "" echo ""
echo -e "${BOLD}Full Reset:${NC}" echo -e "${BOLD}Full Reset:${NC}"
echo " full-reset Complete system reset ${RED}(DANGEROUS!)${NC}" echo " full-reset Complete system reset ${RED}(DANGEROUS!)${NC}"
@ -1103,6 +1261,20 @@ main() {
sync_status sync_status
;; ;;
# Outbox connector commands
outbox-register)
print_header
register_outbox_connectors
;;
outbox-status)
print_header
outbox_status
;;
outbox-delete)
print_header
delete_outbox_connectors
;;
# Full reset # Full reset
full-reset) full-reset)
print_header print_header

View File

@ -77,6 +77,16 @@ export class CdcSyncService implements OnModuleInit {
'ContributionAccountUpdated', 'ContributionAccountUpdated',
this.handleContributionAccountUpdated.bind(this), this.handleContributionAccountUpdated.bind(this),
); );
// ContributionAccountSynced 用于初始全量同步
this.cdcConsumer.registerServiceHandler(
'ContributionAccountSynced',
this.handleContributionAccountUpdated.bind(this),
);
// ContributionCalculated 事件在算力计算完成时发布
this.cdcConsumer.registerServiceHandler(
'ContributionCalculated',
this.handleContributionCalculated.bind(this),
);
this.cdcConsumer.registerServiceHandler( this.cdcConsumer.registerServiceHandler(
'SystemContributionUpdated', 'SystemContributionUpdated',
this.handleSystemContributionUpdated.bind(this), this.handleSystemContributionUpdated.bind(this),
@ -450,6 +460,52 @@ export class CdcSyncService implements OnModuleInit {
} }
} }
/**
* ContributionCalculated
* contribution-service
*/
private async handleContributionCalculated(
event: ServiceEvent,
): Promise<void> {
const { payload } = event;
try {
// ContributionCalculated 事件只包含部分信息,需要获取完整数据
// 这里只更新已存在的记录,或创建基本记录等待后续同步
await this.prisma.syncedContributionAccount.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
personalContribution: payload.personalContribution || 0,
teamLevelContribution: 0,
teamBonusContribution: 0,
totalContribution: 0,
effectiveContribution: 0,
hasAdopted: true, // 有算力计算说明已认种
directReferralCount: 0,
unlockedLevelDepth: 0,
},
update: {
// 增量更新个人算力
personalContribution: {
increment: parseFloat(payload.personalContribution) || 0,
},
hasAdopted: true,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(
`Processed contribution calculation: ${payload.accountSequence}`,
);
} catch (error) {
this.logger.error(
`Failed to process contribution calculation: ${payload.accountSequence}`,
error,
);
}
}
private async handleSystemContributionUpdated( private async handleSystemContributionUpdated(
event: ServiceEvent, event: ServiceEvent,
): Promise<void> { ): Promise<void> {