fix(admin-service): 修复用户数据CDC同步使用userId导致的数据不一致问题
问题原因: - 旧的Kafka事件消费者和CDC消费者同时运行 - 旧消费者写入的数据userId可能为0 - CDC消费者使用userId作为upsert条件,导致唯一键冲突失败 - 用户的nickname和kycStatus等信息没有正确同步 修复方案: - upsert方法改用accountSequence作为唯一键 - CDC消费者的handleUpdate使用accountSequence检查和更新 - 更新时同时修复可能错误的userId - 新增existsByAccountSequence和updateKycStatusByAccountSequence方法 影响范围: - admin-web用户管理页面现在能正确显示用户昵称和KYC状态 - 新用户注册后数据能正确同步 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
51114f265d
commit
45736c4daf
|
|
@ -660,7 +660,11 @@
|
|||
"Bash(DATABASE_URL=\"postgresql://test:test@localhost:5432/test\" npx prisma format:*)",
|
||||
"Bash(timeout 60 npx tsc:*)",
|
||||
"Bash(git commit -m \"$\\(cat <<''EOF''\nfeat\\(wallet-service\\): 三层保护机制确保内部转账接收方钱包存在\n\n新增三层保护机制:\n1. 用户注册时:监听 identity.UserAccountCreated 事件自动创建钱包\n2. 发起转账时:检测内部转账后调用 ensureWalletExists\\(\\) 预创建钱包\n3. 链上确认时:原有 upsert 逻辑兜底(保持不变)\n\n新增文件:\n- identity-event-consumer.service.ts: 消费 identity 用户注册事件\n- user-account-created.handler.ts: 处理用户注册事件创建钱包\n\n新增 API:\n- POST /wallets/ensure-wallet: 确保单个钱包存在\n- POST /wallets/ensure-wallets: 批量确保钱包存在\n\n🤖 Generated with [Claude Code]\\(https://claude.com/claude-code\\)\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
|
||||
"Bash(git -C \"c:/Users/dong/Desktop/rwadurian\" add -A)"
|
||||
"Bash(git -C \"c:/Users/dong/Desktop/rwadurian\" add -A)",
|
||||
"Bash(git -C \"c:/Users/dong/Desktop/rwadurian\" commit -m \"$\\(cat <<''EOF''\nfix\\(planting-service\\): 修复合同PDF签署日期显示为UTC时间的问题\n\n合同生成时使用 new Date\\(\\).toISOString\\(\\).split\\(''T''\\)[0] 获取日期,\n该方法返回UTC时间,导致北京时间凌晨签署的合同显示为前一天日期。\n\n修复方案:新增 getBeijingDateString\\(\\) 函数,将UTC时间转换为北京时间\\(UTC+8\\)\n\n影响范围:仅影响PDF合同上显示的签署日期,不影响数据库时间戳或业务逻辑\n\n🤖 Generated with [Claude Code]\\(https://claude.com/claude-code\\)\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
|
||||
"Bash(git -C \"c:/Users/dong/Desktop/rwadurian\" push origin main)",
|
||||
"Bash(git -C \"c:/Users/dong/Desktop/rwadurian\" tag -a v1.0.0 -m \"$\\(cat <<''EOF''\nRelease v1.0.0 - 正式发布\n\n主要功能:\n- 用户身份认证与KYC实名认证\n- 榴莲树认种与合同签署系统\n- 钱包与资产管理(USDT/绿积分/算力)\n- 推荐关系与团队管理\n- 收益分配与奖励系统\n- 排行榜系统\n- 后台管理系统\n- MPC多方计算钱包\n- 区块链服务(KAVA链)\n\n🤖 Generated with [Claude Code]\\(https://claude.com/claude-code\\)\nEOF\n\\)\")",
|
||||
"Bash(git -C \"c:/Users/dong/Desktop/rwadurian\" push origin v1.0.0)"
|
||||
],
|
||||
"deny": [],
|
||||
"ask": []
|
||||
|
|
|
|||
|
|
@ -129,10 +129,15 @@ export interface IUserQueryRepository {
|
|||
updateStatus(userId: bigint, status: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* 更新 KYC 状态
|
||||
* 更新 KYC 状态(按 userId)
|
||||
*/
|
||||
updateKycStatus(userId: bigint, kycStatus: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* 更新 KYC 状态(按 accountSequence)
|
||||
*/
|
||||
updateKycStatusByAccountSequence(accountSequence: string, kycStatus: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* 更新在线状态
|
||||
*/
|
||||
|
|
@ -149,7 +154,12 @@ export interface IUserQueryRepository {
|
|||
count(filters?: UserQueryFilters): Promise<number>;
|
||||
|
||||
/**
|
||||
* 检查用户是否存在
|
||||
* 检查用户是否存在(按 userId)
|
||||
*/
|
||||
exists(userId: bigint): Promise<boolean>;
|
||||
|
||||
/**
|
||||
* 检查用户是否存在(按 accountSequence)
|
||||
*/
|
||||
existsByAccountSequence(accountSequence: string): Promise<boolean>;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -203,10 +203,9 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
}
|
||||
|
||||
private async handleUpdate(data: CdcUserAccountPayload): Promise<void> {
|
||||
const userId = BigInt(data.user_id);
|
||||
|
||||
// 检查用户是否存在
|
||||
const exists = await this.userQueryRepository.exists(userId);
|
||||
// 使用 accountSequence 检查用户是否存在(而不是 userId)
|
||||
// 因为旧系统可能写入了 userId=0 的数据
|
||||
const exists = await this.userQueryRepository.existsByAccountSequence(data.account_sequence);
|
||||
|
||||
if (!exists) {
|
||||
// 如果不存在,创建(可能是之前遗漏的)
|
||||
|
|
@ -218,10 +217,11 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
? this.maskPhoneNumber(data.phone_number)
|
||||
: null;
|
||||
|
||||
// 更新所有字段
|
||||
// 使用 accountSequence 作为条件更新,同时修复 userId
|
||||
await this.prisma.userQueryView.update({
|
||||
where: { userId },
|
||||
where: { accountSequence: data.account_sequence },
|
||||
data: {
|
||||
userId: BigInt(data.user_id), // 修复可能错误的 userId
|
||||
nickname: data.nickname || null,
|
||||
avatarUrl: data.avatar_url || null,
|
||||
phoneNumberMasked,
|
||||
|
|
|
|||
|
|
@ -70,8 +70,10 @@ export class UserQueryRepositoryImpl implements IUserQueryRepository {
|
|||
status?: string;
|
||||
registeredAt: Date;
|
||||
}): Promise<void> {
|
||||
// 使用 accountSequence 作为唯一键进行 upsert
|
||||
// 这样可以正确处理 userId 不一致的情况(旧系统可能写入 userId=0)
|
||||
await this.prisma.userQueryView.upsert({
|
||||
where: { userId: data.userId },
|
||||
where: { accountSequence: data.accountSequence },
|
||||
create: {
|
||||
userId: data.userId,
|
||||
accountSequence: data.accountSequence,
|
||||
|
|
@ -85,7 +87,8 @@ export class UserQueryRepositoryImpl implements IUserQueryRepository {
|
|||
syncedAt: new Date(),
|
||||
},
|
||||
update: {
|
||||
accountSequence: data.accountSequence,
|
||||
// 始终更新 userId 以修复旧数据
|
||||
userId: data.userId,
|
||||
nickname: data.nickname !== undefined ? data.nickname : undefined,
|
||||
avatarUrl: data.avatarUrl !== undefined ? data.avatarUrl : undefined,
|
||||
phoneNumberMasked: data.phoneNumberMasked !== undefined ? data.phoneNumberMasked : undefined,
|
||||
|
|
@ -178,6 +181,18 @@ export class UserQueryRepositoryImpl implements IUserQueryRepository {
|
|||
this.logger.debug(`[UserQueryView] Updated KYC status for user ${userId} to ${kycStatus}`);
|
||||
}
|
||||
|
||||
async updateKycStatusByAccountSequence(accountSequence: string, kycStatus: string): Promise<void> {
|
||||
await this.prisma.userQueryView.update({
|
||||
where: { accountSequence },
|
||||
data: {
|
||||
kycStatus,
|
||||
syncedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.debug(`[UserQueryView] Updated KYC status for user ${accountSequence} to ${kycStatus}`);
|
||||
}
|
||||
|
||||
async updateOnlineStatus(userId: bigint, isOnline: boolean): Promise<void> {
|
||||
await this.prisma.userQueryView.update({
|
||||
where: { userId },
|
||||
|
|
@ -214,6 +229,13 @@ export class UserQueryRepositoryImpl implements IUserQueryRepository {
|
|||
return count > 0;
|
||||
}
|
||||
|
||||
async existsByAccountSequence(accountSequence: string): Promise<boolean> {
|
||||
const count = await this.prisma.userQueryView.count({
|
||||
where: { accountSequence },
|
||||
});
|
||||
return count > 0;
|
||||
}
|
||||
|
||||
// ==================== Private Methods ====================
|
||||
|
||||
private buildWhereClause(filters: UserQueryFilters): Prisma.UserQueryViewWhereInput {
|
||||
|
|
|
|||
Loading…
Reference in New Issue