feat(sync): 完善 CDC 数据同步 - 添加推荐关系、认种记录和昵称字段

- auth-service:
  - SyncedLegacyUser 表添加 nickname 字段
  - LegacyUserMigratedEvent 添加 nickname 参数
  - CDC consumer 同步 nickname 字段
  - SyncedLegacyUserData 接口添加 nickname

- contribution-service:
  - 新增 ReferralSyncedEvent 事件类
  - 新增 AdoptionSyncedEvent 事件类
  - admin.controller 添加 publish-all APIs:
    - POST /admin/referrals/publish-all
    - POST /admin/adoptions/publish-all

- mining-admin-service:
  - SyncedUser 表添加 nickname 字段
  - 新增 SyncedReferral 表 (推荐关系)
  - 新增 SyncedAdoption 表 (认种记录)
  - handleReferralSynced 处理器
  - handleAdoptionSynced 处理器
  - handleLegacyUserMigrated 处理 nickname

- deploy-mining.sh:
  - full_reset 更新为 14 步
  - Step 13: 发布推荐关系
  - Step 14: 发布认种记录

解决 mining-admin-web 缺少昵称、推荐人、认种数据的问题

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 02:48:15 -08:00
parent 11eb1f8a04
commit 30b04c6376
15 changed files with 392 additions and 17 deletions

View File

@ -739,7 +739,17 @@
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker restart rwa-postgres && sleep 10 && docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -t -c \"\"\nSELECT ''synced_users'' as tbl, COUNT\\(*\\) FROM synced_users\nUNION ALL SELECT ''synced_contribution_accounts'', COUNT\\(*\\) FROM synced_contribution_accounts\nUNION ALL SELECT ''synced_mining_accounts'', COUNT\\(*\\) FROM synced_mining_accounts\nUNION ALL SELECT ''synced_trading_accounts'', COUNT\\(*\\) FROM synced_trading_accounts;\n\"\"\")", "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker restart rwa-postgres && sleep 10 && docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -t -c \"\"\nSELECT ''synced_users'' as tbl, COUNT\\(*\\) FROM synced_users\nUNION ALL SELECT ''synced_contribution_accounts'', COUNT\\(*\\) FROM synced_contribution_accounts\nUNION ALL SELECT ''synced_mining_accounts'', COUNT\\(*\\) FROM synced_mining_accounts\nUNION ALL SELECT ''synced_trading_accounts'', COUNT\\(*\\) FROM synced_trading_accounts;\n\"\"\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d postgres -c \"\"SELECT datname, count\\(*\\) FROM pg_stat_activity GROUP BY datname ORDER BY count DESC;\"\"\")", "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d postgres -c \"\"SELECT datname, count\\(*\\) FROM pg_stat_activity GROUP BY datname ORDER BY count DESC;\"\"\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d postgres -c \"\"SHOW max_connections;\"\" && docker exec rwa-postgres psql -U rwa_user -d postgres -c \"\"SELECT count\\(*\\) as current_connections FROM pg_stat_activity;\"\"\")", "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d postgres -c \"\"SHOW max_connections;\"\" && docker exec rwa-postgres psql -U rwa_user -d postgres -c \"\"SELECT count\\(*\\) as current_connections FROM pg_stat_activity;\"\"\")",
"Bash(git commit -m \"$\\(cat <<''EOF''\nfix\\(postgres\\): 增加数据库最大连接数到 300\n\n- max_connections: 100 -> 300\n- max_replication_slots: 10 -> 20 \n- max_wal_senders: 10 -> 20\n\n支持更多服务和 Debezium connectors 同时连接\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")" "Bash(git commit -m \"$\\(cat <<''EOF''\nfix\\(postgres\\): 增加数据库最大连接数到 300\n\n- max_connections: 100 -> 300\n- max_replication_slots: 10 -> 20 \n- max_wal_senders: 10 -> 20\n\n支持更多服务和 Debezium connectors 同时连接\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -c ''SELECT * FROM synced_users LIMIT 2;''\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -c ''SELECT * FROM synced_contribution_accounts LIMIT 2;''\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''SELECT account_sequence, has_adopted, direct_referral_adopted_count, unlocked_level_depth FROM contribution_accounts LIMIT 5;''\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''SELECT account_sequence, adopter_count FROM synced_users LIMIT 5;''\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''\\\\d synced_users''\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''SELECT * FROM synced_adoptions LIMIT 3;''\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''SELECT * FROM synced_referrals LIMIT 3;''\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -c ''\\\\d synced_users''\")",
"Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -c \"\"SELECT table_name FROM information_schema.tables WHERE table_schema=''public'' ORDER BY table_name;\"\"\")",
"Bash(dir /b \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\\\\backend\\\\services\\\\contribution-service\\\\src\\\\domain\\\\events\")"
], ],
"deny": [], "deny": [],
"ask": [] "ask": []

View File

@ -86,6 +86,7 @@ model SyncedLegacyUser {
accountSequence String @unique @map("account_sequence") accountSequence String @unique @map("account_sequence")
phone String? // 系统账户可能没有手机号 phone String? // 系统账户可能没有手机号
passwordHash String? @map("password_hash") // 系统账户可能没有密码 passwordHash String? @map("password_hash") // 系统账户可能没有密码
nickname String? // 昵称 (from identity-service)
status String status String
legacyCreatedAt DateTime @map("legacy_created_at") legacyCreatedAt DateTime @map("legacy_created_at")

View File

@ -96,6 +96,7 @@ export class AdminSyncService {
select: { select: {
accountSequence: true, accountSequence: true,
phone: true, phone: true,
nickname: true,
legacyCreatedAt: true, legacyCreatedAt: true,
}, },
}); });
@ -108,6 +109,7 @@ export class AdminSyncService {
const event = new LegacyUserMigratedEvent( const event = new LegacyUserMigratedEvent(
user.accountSequence, user.accountSequence,
user.phone || '', user.phone || '',
user.nickname || '',
user.legacyCreatedAt || new Date(), user.legacyCreatedAt || new Date(),
); );
await this.outboxService.publish(event); await this.outboxService.publish(event);

View File

@ -221,7 +221,7 @@ export class AuthService {
* V1 * V1
*/ */
private async migrateAndLogin( private async migrateAndLogin(
legacyUser: { accountSequence: AccountSequence; phone: Phone; passwordHash: string }, legacyUser: { accountSequence: AccountSequence; phone: Phone; passwordHash: string; nickname: string | null },
password: string, password: string,
deviceInfo?: string, deviceInfo?: string,
ipAddress?: string, ipAddress?: string,
@ -254,6 +254,7 @@ export class AuthService {
new LegacyUserMigratedEvent( new LegacyUserMigratedEvent(
legacyUser.accountSequence.value, legacyUser.accountSequence.value,
legacyUser.phone.value, legacyUser.phone.value,
legacyUser.nickname || '',
new Date(), new Date(),
), ),
); );
@ -265,7 +266,7 @@ export class AuthService {
* *
*/ */
private async performMigration( private async performMigration(
legacyUser: { accountSequence: AccountSequence; phone: Phone; passwordHash: string }, legacyUser: { accountSequence: AccountSequence; phone: Phone; passwordHash: string; nickname: string | null },
deviceInfo?: string, deviceInfo?: string,
ipAddress?: string, ipAddress?: string,
): Promise<LoginResult> { ): Promise<LoginResult> {
@ -286,6 +287,7 @@ export class AuthService {
new LegacyUserMigratedEvent( new LegacyUserMigratedEvent(
legacyUser.accountSequence.value, legacyUser.accountSequence.value,
legacyUser.phone.value, legacyUser.phone.value,
legacyUser.nickname || '',
new Date(), new Date(),
), ),
); );

View File

@ -7,6 +7,7 @@ export class LegacyUserMigratedEvent {
constructor( constructor(
public readonly accountSequence: string, public readonly accountSequence: string,
public readonly phone: string, public readonly phone: string,
public readonly nickname: string,
public readonly migratedAt: Date, public readonly migratedAt: Date,
) {} ) {}
@ -15,6 +16,7 @@ export class LegacyUserMigratedEvent {
eventType: LegacyUserMigratedEvent.EVENT_TYPE, eventType: LegacyUserMigratedEvent.EVENT_TYPE,
accountSequence: this.accountSequence, accountSequence: this.accountSequence,
phone: this.phone, phone: this.phone,
nickname: this.nickname,
migratedAt: this.migratedAt.toISOString(), migratedAt: this.migratedAt.toISOString(),
}; };
} }

View File

@ -11,6 +11,7 @@ export interface SyncedLegacyUserData {
accountSequence: AccountSequence; accountSequence: AccountSequence;
phone: Phone; phone: Phone;
passwordHash: string; passwordHash: string;
nickname: string | null;
status: string; status: string;
legacyCreatedAt: Date; legacyCreatedAt: Date;
migratedToV2: boolean; migratedToV2: boolean;

View File

@ -15,6 +15,7 @@ interface UnwrappedCdcUser {
phone_number: string; phone_number: string;
password_hash: string; password_hash: string;
account_sequence: string; account_sequence: string;
nickname: string; // 昵称
status: string; status: string;
registered_at: number; // timestamp in milliseconds registered_at: number; // timestamp in milliseconds
@ -139,6 +140,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy {
update: { update: {
phone: user.phone_number ?? undefined, phone: user.phone_number ?? undefined,
passwordHash: user.password_hash ?? undefined, passwordHash: user.password_hash ?? undefined,
nickname: user.nickname ?? undefined,
accountSequence: user.account_sequence, accountSequence: user.account_sequence,
status: user.status, status: user.status,
sourceSequenceNum: sequenceNum, sourceSequenceNum: sequenceNum,
@ -148,6 +150,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy {
legacyId: BigInt(user.user_id), legacyId: BigInt(user.user_id),
phone: user.phone_number ?? null, phone: user.phone_number ?? null,
passwordHash: user.password_hash ?? null, passwordHash: user.password_hash ?? null,
nickname: user.nickname ?? null,
accountSequence: user.account_sequence, accountSequence: user.account_sequence,
status: user.status, status: user.status,
legacyCreatedAt: new Date(user.registered_at), legacyCreatedAt: new Date(user.registered_at),
@ -161,6 +164,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy {
const event = new LegacyUserMigratedEvent( const event = new LegacyUserMigratedEvent(
user.account_sequence, user.account_sequence,
user.phone_number || '', user.phone_number || '',
user.nickname || '',
new Date(user.registered_at), new Date(user.registered_at),
); );
await this.outboxService.publish(event); await this.outboxService.publish(event);

View File

@ -55,6 +55,7 @@ export class PrismaSyncedLegacyUserRepository implements SyncedLegacyUserReposit
accountSequence: AccountSequence.create(user.accountSequence), accountSequence: AccountSequence.create(user.accountSequence),
phone: Phone.create(user.phone), phone: Phone.create(user.phone),
passwordHash: user.passwordHash, passwordHash: user.passwordHash,
nickname: user.nickname,
status: user.status, status: user.status,
legacyCreatedAt: user.legacyCreatedAt, legacyCreatedAt: user.legacyCreatedAt,
migratedToV2: user.migratedToV2, migratedToV2: user.migratedToV2,

View File

@ -3,7 +3,11 @@ import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
import { ContributionAccountSyncedEvent } from '../../domain/events'; import {
ContributionAccountSyncedEvent,
ReferralSyncedEvent,
AdoptionSyncedEvent,
} from '../../domain/events';
import { Public } from '../../shared/guards/jwt-auth.guard'; import { Public } from '../../shared/guards/jwt-auth.guard';
@ApiTags('Admin') @ApiTags('Admin')
@ -130,4 +134,140 @@ export class AdminController {
message: `Published ${publishedCount} events, ${failedCount} failed out of ${accounts.length} total`, message: `Published ${publishedCount} events, ${failedCount} failed out of ${accounts.length} total`,
}; };
} }
@Post('referrals/publish-all')
@Public()
@ApiOperation({ summary: '发布所有推荐关系事件到 outbox用于同步到 mining-admin-service' })
async publishAllReferrals(): Promise<{
success: boolean;
publishedCount: number;
failedCount: number;
message: string;
}> {
const referrals = await this.prisma.syncedReferral.findMany({
select: {
accountSequence: true,
referrerAccountSequence: true,
referrerUserId: true,
originalUserId: true,
ancestorPath: true,
depth: true,
},
});
let publishedCount = 0;
let failedCount = 0;
const batchSize = 100;
for (let i = 0; i < referrals.length; i += batchSize) {
const batch = referrals.slice(i, i + batchSize);
try {
await this.unitOfWork.executeInTransaction(async () => {
const events = batch.map((ref) => {
const event = new ReferralSyncedEvent(
ref.accountSequence,
ref.referrerAccountSequence,
ref.referrerUserId,
ref.originalUserId,
ref.ancestorPath,
ref.depth,
);
return {
aggregateType: ReferralSyncedEvent.AGGREGATE_TYPE,
aggregateId: ref.accountSequence,
eventType: ReferralSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
});
publishedCount += batch.length;
this.logger.debug(`Published referral batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`);
} catch (error) {
failedCount += batch.length;
this.logger.error(`Failed to publish referral batch ${Math.floor(i / batchSize) + 1}`, error);
}
}
this.logger.log(`Published ${publishedCount} referral events, ${failedCount} failed`);
return {
success: failedCount === 0,
publishedCount,
failedCount,
message: `Published ${publishedCount} events, ${failedCount} failed out of ${referrals.length} total`,
};
}
@Post('adoptions/publish-all')
@Public()
@ApiOperation({ summary: '发布所有认种记录事件到 outbox用于同步到 mining-admin-service' })
async publishAllAdoptions(): Promise<{
success: boolean;
publishedCount: number;
failedCount: number;
message: string;
}> {
const adoptions = await this.prisma.syncedAdoption.findMany({
select: {
originalAdoptionId: true,
accountSequence: true,
treeCount: true,
adoptionDate: true,
status: true,
contributionPerTree: true,
},
});
let publishedCount = 0;
let failedCount = 0;
const batchSize = 100;
for (let i = 0; i < adoptions.length; i += batchSize) {
const batch = adoptions.slice(i, i + batchSize);
try {
await this.unitOfWork.executeInTransaction(async () => {
const events = batch.map((adoption) => {
const event = new AdoptionSyncedEvent(
adoption.originalAdoptionId,
adoption.accountSequence,
adoption.treeCount,
adoption.adoptionDate,
adoption.status,
adoption.contributionPerTree.toString(),
);
return {
aggregateType: AdoptionSyncedEvent.AGGREGATE_TYPE,
aggregateId: adoption.originalAdoptionId.toString(),
eventType: AdoptionSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
});
publishedCount += batch.length;
this.logger.debug(`Published adoption batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`);
} catch (error) {
failedCount += batch.length;
this.logger.error(`Failed to publish adoption batch ${Math.floor(i / batchSize) + 1}`, error);
}
}
this.logger.log(`Published ${publishedCount} adoption events, ${failedCount} failed`);
return {
success: failedCount === 0,
publishedCount,
failedCount,
message: `Published ${publishedCount} events, ${failedCount} failed out of ${adoptions.length} total`,
};
}
} }

View File

@ -0,0 +1,29 @@
/**
*
* mining-admin-service
*/
export class AdoptionSyncedEvent {
static readonly EVENT_TYPE = 'AdoptionSynced';
static readonly AGGREGATE_TYPE = 'Adoption';
constructor(
public readonly originalAdoptionId: bigint,
public readonly accountSequence: string,
public readonly treeCount: number,
public readonly adoptionDate: Date,
public readonly status: string | null,
public readonly contributionPerTree: string,
) {}
toPayload(): Record<string, any> {
return {
eventType: AdoptionSyncedEvent.EVENT_TYPE,
originalAdoptionId: this.originalAdoptionId.toString(),
accountSequence: this.accountSequence,
treeCount: this.treeCount,
adoptionDate: this.adoptionDate.toISOString(),
status: this.status,
contributionPerTree: this.contributionPerTree,
};
}
}

View File

@ -1,3 +1,5 @@
export * from './contribution-calculated.event'; export * from './contribution-calculated.event';
export * from './daily-snapshot-created.event'; export * from './daily-snapshot-created.event';
export * from './contribution-account-synced.event'; export * from './contribution-account-synced.event';
export * from './referral-synced.event';
export * from './adoption-synced.event';

View File

@ -0,0 +1,29 @@
/**
*
* mining-admin-service
*/
export class ReferralSyncedEvent {
static readonly EVENT_TYPE = 'ReferralSynced';
static readonly AGGREGATE_TYPE = 'Referral';
constructor(
public readonly accountSequence: string,
public readonly referrerAccountSequence: string | null,
public readonly referrerUserId: bigint | null,
public readonly originalUserId: bigint | null,
public readonly ancestorPath: string | null,
public readonly depth: number,
) {}
toPayload(): Record<string, any> {
return {
eventType: ReferralSyncedEvent.EVENT_TYPE,
accountSequence: this.accountSequence,
referrerAccountSequence: this.referrerAccountSequence,
referrerUserId: this.referrerUserId?.toString() ?? null,
originalUserId: this.originalUserId?.toString() ?? null,
ancestorPath: this.ancestorPath,
depth: this.depth,
};
}
}

View File

@ -881,16 +881,16 @@ full_reset() {
fi fi
echo "" echo ""
log_step "Step 1/12: Stopping 2.0 services..." log_step "Step 1/14: Stopping 2.0 services..."
for service in "${MINING_SERVICES[@]}"; do for service in "${MINING_SERVICES[@]}"; do
service_stop "$service" service_stop "$service"
done done
log_step "Step 2/12: Waiting for Kafka consumers to become inactive..." log_step "Step 2/14: Waiting for Kafka consumers to become inactive..."
log_info "Waiting 15 seconds for consumer group session timeout..." log_info "Waiting 15 seconds for consumer group session timeout..."
sleep 15 sleep 15
log_step "Step 3/12: Resetting CDC consumer offsets..." log_step "Step 3/14: Resetting CDC consumer offsets..."
# Reset offsets BEFORE migrations (which may start containers) # Reset offsets BEFORE migrations (which may start containers)
for group in "${CDC_CONSUMER_GROUPS[@]}"; do for group in "${CDC_CONSUMER_GROUPS[@]}"; do
log_info "Resetting consumer group: $group" log_info "Resetting consumer group: $group"
@ -927,17 +927,17 @@ full_reset() {
fi fi
done done
log_step "Step 4/12: Dropping 2.0 databases..." log_step "Step 4/14: Dropping 2.0 databases..."
db_drop db_drop
log_step "Step 5/12: Creating 2.0 databases..." log_step "Step 5/14: Creating 2.0 databases..."
db_create db_create
log_step "Step 6/12: Running migrations..." log_step "Step 6/14: Running migrations..."
db_migrate db_migrate
# Stop any containers that were started during migration # Stop any containers that were started during migration
log_step "Step 7/12: Stopping containers and resetting CDC offsets again..." log_step "Step 7/14: Stopping containers and resetting CDC offsets again..."
log_info "Migration may have started CDC consumers, stopping them now..." log_info "Migration may have started CDC consumers, stopping them now..."
for service in "${MINING_SERVICES[@]}"; do for service in "${MINING_SERVICES[@]}"; do
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true
@ -981,22 +981,22 @@ full_reset() {
fi fi
done done
log_step "Step 8/12: Registering Debezium outbox connectors..." log_step "Step 8/14: Registering Debezium outbox connectors..."
# Register outbox connectors for 2.0 service events # Register outbox connectors for 2.0 service events
# These connectors capture events from each service's outbox table and send to Kafka # These connectors capture events from each service's outbox table and send to Kafka
# mining-admin-service consumes these events to aggregate data from all 2.0 services # mining-admin-service consumes these events to aggregate data from all 2.0 services
register_outbox_connectors || log_warn "Some connectors may not be registered" register_outbox_connectors || log_warn "Some connectors may not be registered"
log_step "Step 9/12: Starting 2.0 services..." log_step "Step 9/14: Starting 2.0 services..."
for service in "${MINING_SERVICES[@]}"; do for service in "${MINING_SERVICES[@]}"; do
service_start "$service" service_start "$service"
done done
log_step "Step 10/12: Waiting for services to be ready..." log_step "Step 10/14: Waiting for services to be ready..."
log_info "Waiting 20 seconds for all services to start and sync from 1.0 CDC..." log_info "Waiting 20 seconds for all services to start and sync from 1.0 CDC..."
sleep 20 sleep 20
log_step "Step 11/12: Publishing legacy users to mining-admin-service..." log_step "Step 11/14: Publishing legacy users to mining-admin-service..."
# 调用 auth-service API 发布所有旧用户事件到 outbox # 调用 auth-service API 发布所有旧用户事件到 outbox
# 这样 mining-admin-service 才能通过 Debezium 收到用户数据 # 这样 mining-admin-service 才能通过 Debezium 收到用户数据
local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all" local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all"
@ -1012,7 +1012,7 @@ full_reset() {
log_info "You may need to manually call: curl -X POST $publish_url" log_info "You may need to manually call: curl -X POST $publish_url"
fi fi
log_step "Step 12/12: Publishing contribution data to mining-admin-service..." log_step "Step 12/14: Publishing contribution data to mining-admin-service..."
# 调用 contribution-service API 发布所有算力账户事件到 outbox # 调用 contribution-service API 发布所有算力账户事件到 outbox
local contrib_publish_url="http://localhost:3020/api/v2/admin/contribution-accounts/publish-all" local contrib_publish_url="http://localhost:3020/api/v2/admin/contribution-accounts/publish-all"
local contrib_result local contrib_result
@ -1027,6 +1027,36 @@ full_reset() {
log_info "You may need to manually call: curl -X POST $contrib_publish_url" log_info "You may need to manually call: curl -X POST $contrib_publish_url"
fi fi
log_step "Step 13/14: Publishing referral relationships to mining-admin-service..."
# 调用 contribution-service API 发布所有推荐关系事件到 outbox
local referral_publish_url="http://localhost:3020/api/v2/admin/referrals/publish-all"
local referral_result
referral_result=$(curl -s -X POST "$referral_publish_url" 2>/dev/null || echo '{"error": "curl failed"}')
if echo "$referral_result" | grep -q '"success":true'; then
local referral_count
referral_count=$(echo "$referral_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*')
log_success "Published $referral_count referral events to outbox"
else
log_warn "Failed to publish referral data: $referral_result"
log_info "You may need to manually call: curl -X POST $referral_publish_url"
fi
log_step "Step 14/14: Publishing adoption records to mining-admin-service..."
# 调用 contribution-service API 发布所有认种记录事件到 outbox
local adoption_publish_url="http://localhost:3020/api/v2/admin/adoptions/publish-all"
local adoption_result
adoption_result=$(curl -s -X POST "$adoption_publish_url" 2>/dev/null || echo '{"error": "curl failed"}')
if echo "$adoption_result" | grep -q '"success":true'; then
local adoption_count
adoption_count=$(echo "$adoption_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*')
log_success "Published $adoption_count adoption events to outbox"
else
log_warn "Failed to publish adoption data: $adoption_result"
log_info "You may need to manually call: curl -X POST $adoption_publish_url"
fi
# 等待 mining-admin-service 消费 outbox 事件 # 等待 mining-admin-service 消费 outbox 事件
log_info "Waiting 15 seconds for mining-admin-service to sync all data..." log_info "Waiting 15 seconds for mining-admin-service to sync all data..."
sleep 15 sleep 15

View File

@ -154,6 +154,7 @@ model SyncedUser {
originalUserId String @unique // auth-service 中的原始 ID originalUserId String @unique // auth-service 中的原始 ID
accountSequence String @unique // 账户序列号 accountSequence String @unique // 账户序列号
phone String phone String
nickname String? // 昵称 (from identity-service)
status String // ACTIVE, DISABLED, DELETED status String // ACTIVE, DISABLED, DELETED
kycStatus String // PENDING, SUBMITTED, VERIFIED, REJECTED kycStatus String // PENDING, SUBMITTED, VERIFIED, REJECTED
realName String? realName String?
@ -166,6 +167,7 @@ model SyncedUser {
contributionAccount SyncedContributionAccount? contributionAccount SyncedContributionAccount?
miningAccount SyncedMiningAccount? miningAccount SyncedMiningAccount?
tradingAccount SyncedTradingAccount? tradingAccount SyncedTradingAccount?
referral SyncedReferral?
@@index([phone]) @@index([phone])
@@index([status]) @@index([status])
@ -197,6 +199,48 @@ model SyncedContributionAccount {
@@map("synced_contribution_accounts") @@map("synced_contribution_accounts")
} }
// =============================================================================
// CDC 同步表 - 推荐关系 (from contribution-service)
// =============================================================================
model SyncedReferral {
id String @id @default(uuid())
accountSequence String @unique
referrerAccountSequence String? // 推荐人账户序列号
referrerUserId BigInt? // 1.0 的 referrer_id
originalUserId BigInt? // 1.0 的 user_id
ancestorPath String? @db.Text // 祖先路径(逗号分隔的 user_id
depth Int @default(0)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
user SyncedUser @relation(fields: [accountSequence], references: [accountSequence])
@@index([referrerAccountSequence])
@@index([depth])
@@map("synced_referrals")
}
// =============================================================================
// CDC 同步表 - 认种记录 (from contribution-service)
// =============================================================================
model SyncedAdoption {
id String @id @default(uuid())
originalAdoptionId BigInt @unique
accountSequence String
treeCount Int
adoptionDate DateTime @db.Date
status String? // 认种状态
contributionPerTree Decimal @db.Decimal(20, 10)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@index([accountSequence])
@@index([adoptionDate])
@@map("synced_adoptions")
}
// ============================================================================= // =============================================================================
// CDC 同步表 - 挖矿账户 (from mining-service) // CDC 同步表 - 挖矿账户 (from mining-service)
// ============================================================================= // =============================================================================

View File

@ -91,6 +91,16 @@ export class CdcSyncService implements OnModuleInit {
'SystemContributionUpdated', 'SystemContributionUpdated',
this.handleSystemContributionUpdated.bind(this), this.handleSystemContributionUpdated.bind(this),
); );
// ReferralSynced 事件 - 同步推荐关系
this.cdcConsumer.registerServiceHandler(
'ReferralSynced',
this.handleReferralSynced.bind(this),
);
// AdoptionSynced 事件 - 同步认种记录
this.cdcConsumer.registerServiceHandler(
'AdoptionSynced',
this.handleAdoptionSynced.bind(this),
);
// =========================================================================== // ===========================================================================
// 从 mining-service 同步挖矿数据 (通过 Debezium CDC 监听 outbox_events 表) // 从 mining-service 同步挖矿数据 (通过 Debezium CDC 监听 outbox_events 表)
@ -339,7 +349,7 @@ export class CdcSyncService implements OnModuleInit {
/** /**
* auth-service user.legacy.migrated * auth-service user.legacy.migrated
* payload: { accountSequence, phone, migratedAt } * payload: { accountSequence, phone, nickname, migratedAt }
*/ */
private async handleLegacyUserMigrated(event: ServiceEvent): Promise<void> { private async handleLegacyUserMigrated(event: ServiceEvent): Promise<void> {
const { payload } = event; const { payload } = event;
@ -351,6 +361,7 @@ export class CdcSyncService implements OnModuleInit {
originalUserId: payload.accountSequence, originalUserId: payload.accountSequence,
accountSequence: payload.accountSequence, accountSequence: payload.accountSequence,
phone: payload.phone, phone: payload.phone,
nickname: payload.nickname || null,
status: 'ACTIVE', status: 'ACTIVE',
kycStatus: 'PENDING', kycStatus: 'PENDING',
realName: null, realName: null,
@ -359,6 +370,7 @@ export class CdcSyncService implements OnModuleInit {
}, },
update: { update: {
phone: payload.phone, phone: payload.phone,
nickname: payload.nickname || null,
isLegacyUser: true, isLegacyUser: true,
}, },
}); });
@ -539,6 +551,72 @@ export class CdcSyncService implements OnModuleInit {
} }
} }
/**
* ReferralSynced -
*/
private async handleReferralSynced(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedReferral.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
referrerAccountSequence: payload.referrerAccountSequence,
referrerUserId: payload.referrerUserId ? BigInt(payload.referrerUserId) : null,
originalUserId: payload.originalUserId ? BigInt(payload.originalUserId) : null,
ancestorPath: payload.ancestorPath,
depth: payload.depth || 0,
},
update: {
referrerAccountSequence: payload.referrerAccountSequence,
referrerUserId: payload.referrerUserId ? BigInt(payload.referrerUserId) : null,
originalUserId: payload.originalUserId ? BigInt(payload.originalUserId) : null,
ancestorPath: payload.ancestorPath,
depth: payload.depth || 0,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Synced referral: ${payload.accountSequence}`);
} catch (error) {
this.logger.error(`Failed to sync referral: ${payload.accountSequence}`, error);
}
}
/**
* AdoptionSynced -
*/
private async handleAdoptionSynced(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedAdoption.upsert({
where: { originalAdoptionId: BigInt(payload.originalAdoptionId) },
create: {
originalAdoptionId: BigInt(payload.originalAdoptionId),
accountSequence: payload.accountSequence,
treeCount: payload.treeCount,
adoptionDate: new Date(payload.adoptionDate),
status: payload.status,
contributionPerTree: payload.contributionPerTree,
},
update: {
accountSequence: payload.accountSequence,
treeCount: payload.treeCount,
adoptionDate: new Date(payload.adoptionDate),
status: payload.status,
contributionPerTree: payload.contributionPerTree,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Synced adoption: ${payload.originalAdoptionId}`);
} catch (error) {
this.logger.error(`Failed to sync adoption: ${payload.originalAdoptionId}`, error);
}
}
// =========================================================================== // ===========================================================================
// 挖矿账户事件处理 // 挖矿账户事件处理
// =========================================================================== // ===========================================================================