150 lines
4.7 KiB
TypeScript
150 lines
4.7 KiB
TypeScript
import { Injectable, Logger } from '@nestjs/common';
|
||
import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
|
||
import { ContributionAccountAggregate } from '../../domain/aggregates/contribution-account.aggregate';
|
||
|
||
/**
|
||
* 用户 CDC 事件处理器
|
||
* 处理从身份服务同步过来的用户数据
|
||
*
|
||
* 设计说明:100%同步数据,不跳过任何字段更新
|
||
*/
|
||
@Injectable()
|
||
export class UserSyncedHandler {
|
||
private readonly logger = new Logger(UserSyncedHandler.name);
|
||
|
||
constructor() {}
|
||
|
||
async handle(event: CDCEvent, tx: TransactionClient): Promise<void> {
|
||
const { op, before, after } = event.payload;
|
||
|
||
this.logger.log(`[CDC] User event received: op=${op}, seq=${event.sequenceNum}`);
|
||
this.logger.debug(`[CDC] User event payload: ${JSON.stringify(after || before)}`);
|
||
|
||
try {
|
||
switch (op) {
|
||
case 'c': // create
|
||
case 'r': // read (snapshot)
|
||
await this.handleCreate(after, event.sequenceNum, tx);
|
||
break;
|
||
case 'u': // update
|
||
await this.handleUpdate(after, event.sequenceNum, tx);
|
||
break;
|
||
case 'd': // delete
|
||
await this.handleDelete(before);
|
||
break;
|
||
default:
|
||
this.logger.warn(`[CDC] Unknown CDC operation: ${op}`);
|
||
}
|
||
} catch (error) {
|
||
this.logger.error(`[CDC] Failed to handle user CDC event, op=${op}, seq=${event.sequenceNum}`, error);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
|
||
if (!data) {
|
||
this.logger.warn(`[CDC] User create: empty data received`);
|
||
return;
|
||
}
|
||
|
||
const userId = data.user_id ?? data.id;
|
||
const accountSequence = data.account_sequence ?? data.accountSequence;
|
||
const phone = data.phone_number ?? data.phone ?? null;
|
||
const status = data.status ?? null;
|
||
|
||
this.logger.log(`[CDC] User create: userId=${userId}, accountSequence=${accountSequence}, status=${status}`);
|
||
|
||
if (!userId || !accountSequence) {
|
||
this.logger.warn(`[CDC] Invalid user data: missing user_id or account_sequence`, { data });
|
||
return;
|
||
}
|
||
|
||
// 100%同步数据
|
||
await tx.syncedUser.upsert({
|
||
where: { accountSequence },
|
||
create: {
|
||
originalUserId: BigInt(userId),
|
||
accountSequence,
|
||
phone,
|
||
status,
|
||
sourceSequenceNum: sequenceNum,
|
||
syncedAt: new Date(),
|
||
},
|
||
update: {
|
||
originalUserId: BigInt(userId),
|
||
phone,
|
||
status,
|
||
sourceSequenceNum: sequenceNum,
|
||
syncedAt: new Date(),
|
||
},
|
||
});
|
||
|
||
// 为用户创建算力账户(如果不存在)
|
||
const existingAccount = await tx.contributionAccount.findUnique({
|
||
where: { accountSequence },
|
||
});
|
||
|
||
if (!existingAccount) {
|
||
const newAccount = ContributionAccountAggregate.create(accountSequence);
|
||
const persistData = newAccount.toPersistence();
|
||
await tx.contributionAccount.create({
|
||
data: persistData,
|
||
});
|
||
this.logger.log(`[CDC] Created contribution account for user: ${accountSequence}`);
|
||
}
|
||
|
||
this.logger.log(`[CDC] User synced: ${accountSequence}`);
|
||
}
|
||
|
||
private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
|
||
if (!data) {
|
||
this.logger.warn(`[CDC] User update: empty data received`);
|
||
return;
|
||
}
|
||
|
||
const userId = data.user_id ?? data.id;
|
||
const accountSequence = data.account_sequence ?? data.accountSequence;
|
||
const phone = data.phone_number ?? data.phone ?? null;
|
||
const status = data.status ?? null;
|
||
|
||
this.logger.log(`[CDC] User update: userId=${userId}, accountSequence=${accountSequence}, status=${status}`);
|
||
|
||
if (!userId || !accountSequence) {
|
||
this.logger.warn(`[CDC] Invalid user update data: missing user_id or account_sequence`, { data });
|
||
return;
|
||
}
|
||
|
||
// 100%同步数据
|
||
await tx.syncedUser.upsert({
|
||
where: { accountSequence },
|
||
create: {
|
||
originalUserId: BigInt(userId),
|
||
accountSequence,
|
||
phone,
|
||
status,
|
||
sourceSequenceNum: sequenceNum,
|
||
syncedAt: new Date(),
|
||
},
|
||
update: {
|
||
originalUserId: BigInt(userId),
|
||
phone,
|
||
status,
|
||
sourceSequenceNum: sequenceNum,
|
||
syncedAt: new Date(),
|
||
},
|
||
});
|
||
|
||
this.logger.log(`[CDC] User synced: ${accountSequence}`);
|
||
}
|
||
|
||
private async handleDelete(data: any): Promise<void> {
|
||
if (!data) {
|
||
this.logger.warn(`[CDC] User delete: empty data received`);
|
||
return;
|
||
}
|
||
const accountSequence = data.account_sequence ?? data.accountSequence;
|
||
// 用户删除一般不处理,保留历史数据
|
||
this.logger.log(`[CDC] User delete event received: ${accountSequence} (not processed, keeping history)`);
|
||
}
|
||
}
|