feat(auth-service): CDC 同步 wallet_addresses 并提供内部 API

- auth-service: 新增 SyncedWalletAddress Prisma model 和 migration
- auth-service: 新增 WalletAddressCdcConsumer 消费 1.0 钱包地址变更
- auth-service: 新增 InternalController 提供 kava 地址查询 API
- trading-service: IdentityClient 改调 auth-service 内部 API
- docker-compose.2.0.yml: 添加 CDC_TOPIC_WALLET_ADDRESSES 和 AUTH_SERVICE_URL

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-29 12:20:23 -08:00
parent 83fa6bec74
commit 7e289430ae
10 changed files with 375 additions and 16 deletions

View File

@ -0,0 +1,27 @@
-- CreateTable
CREATE TABLE "synced_wallet_addresses" (
"id" BIGSERIAL NOT NULL,
"legacy_address_id" BIGINT NOT NULL,
"legacy_user_id" BIGINT NOT NULL,
"chain_type" TEXT NOT NULL,
"address" TEXT NOT NULL,
"public_key" TEXT NOT NULL,
"status" TEXT NOT NULL DEFAULT 'ACTIVE',
"legacy_bound_at" TIMESTAMP(3) NOT NULL,
"source_sequence_num" BIGINT NOT NULL,
"synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "synced_wallet_addresses_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "synced_wallet_addresses_legacy_address_id_key" ON "synced_wallet_addresses"("legacy_address_id");
-- CreateIndex
CREATE UNIQUE INDEX "synced_wallet_addresses_legacy_user_id_chain_type_key" ON "synced_wallet_addresses"("legacy_user_id", "chain_type");
-- CreateIndex
CREATE INDEX "synced_wallet_addresses_legacy_user_id_idx" ON "synced_wallet_addresses"("legacy_user_id");
-- CreateIndex
CREATE INDEX "synced_wallet_addresses_chain_type_address_idx" ON "synced_wallet_addresses"("chain_type", "address");

View File

@ -104,6 +104,33 @@ model SyncedLegacyUser {
@@map("synced_legacy_users")
}
// ============================================================================
// CDC 同步的 1.0 钱包地址(只读)
// ============================================================================
model SyncedWalletAddress {
id BigInt @id @default(autoincrement())
// 1.0 钱包地址数据
legacyAddressId BigInt @unique @map("legacy_address_id") // 1.0 的 wallet_addresses.address_id
legacyUserId BigInt @map("legacy_user_id") // 1.0 的 wallet_addresses.user_id
chainType String @map("chain_type") // KAVA, BSC 等
address String // 钱包地址
publicKey String @map("public_key") // MPC 公钥
status String @default("ACTIVE") // ACTIVE, DELETED
legacyBoundAt DateTime @map("legacy_bound_at") // 1.0 绑定时间
// CDC 元数据
sourceSequenceNum BigInt @map("source_sequence_num")
syncedAt DateTime @default(now()) @map("synced_at")
@@unique([legacyUserId, chainType])
@@index([legacyUserId])
@@index([chainType, address])
@@map("synced_wallet_addresses")
}
// ============================================================================
// 刷新令牌
// ============================================================================

View File

@ -9,6 +9,7 @@ import {
UserController,
HealthController,
AdminController,
InternalController,
} from './controllers';
import { ApplicationModule } from '@/application';
import { JwtAuthGuard } from '@/shared/guards/jwt-auth.guard';
@ -35,6 +36,7 @@ import { JwtAuthGuard } from '@/shared/guards/jwt-auth.guard';
UserController,
HealthController,
AdminController,
InternalController,
],
providers: [JwtAuthGuard],
})

View File

@ -5,3 +5,4 @@ export * from './kyc.controller';
export * from './user.controller';
export * from './health.controller';
export * from './admin.controller';
export * from './internal.controller';

View File

@ -0,0 +1,50 @@
import { Controller, Get, Param, NotFoundException, Logger } from '@nestjs/common';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
/**
* API - 2.0 JWT
*/
@Controller('internal')
export class InternalController {
private readonly logger = new Logger(InternalController.name);
constructor(private readonly prisma: PrismaService) {}
/**
* accountSequence Kava
* trading-service
*/
@Get('users/:accountSequence/kava-address')
async getUserKavaAddress(
@Param('accountSequence') accountSequence: string,
): Promise<{ kavaAddress: string }> {
// 1. 通过 SyncedLegacyUser 查找 legacyId
const legacyUser = await this.prisma.syncedLegacyUser.findUnique({
where: { accountSequence },
select: { legacyId: true },
});
if (!legacyUser) {
this.logger.warn(`[Internal] Legacy user not found: ${accountSequence}`);
throw new NotFoundException(`用户未找到: ${accountSequence}`);
}
// 2. 通过 legacyUserId + chainType 查找 KAVA 钱包地址
const walletAddress = await this.prisma.syncedWalletAddress.findUnique({
where: {
legacyUserId_chainType: {
legacyUserId: legacyUser.legacyId,
chainType: 'KAVA',
},
},
select: { address: true, status: true },
});
if (!walletAddress || walletAddress.status !== 'ACTIVE') {
this.logger.warn(`[Internal] Kava address not found for: ${accountSequence}`);
throw new NotFoundException(`未找到 Kava 钱包地址: ${accountSequence}`);
}
return { kavaAddress: walletAddress.address };
}
}

View File

@ -7,7 +7,7 @@ import {
PrismaRefreshTokenRepository,
PrismaSmsVerificationRepository,
} from './persistence/repositories';
import { LegacyUserCdcConsumer } from './messaging/cdc';
import { LegacyUserCdcConsumer, WalletAddressCdcConsumer } from './messaging/cdc';
import { KafkaModule, KafkaProducerService } from './kafka';
import { RedisService } from './redis';
import {
@ -24,6 +24,7 @@ import { ApplicationModule } from '@/application/application.module';
providers: [
// CDC
LegacyUserCdcConsumer,
WalletAddressCdcConsumer,
// Kafka Producer
KafkaProducerService,

View File

@ -1 +1,2 @@
export * from './legacy-user-cdc.consumer';
export * from './wallet-address-cdc.consumer';

View File

@ -0,0 +1,243 @@
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { Prisma, PrismaClient } from '@prisma/client';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
/** Prisma 事务客户端类型 */
type TransactionClient = Omit<
PrismaClient,
'$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends'
>;
/**
* ExtractNewRecordState
* identity-service wallet_addresses + Debezium
*/
interface UnwrappedCdcWalletAddress {
// 1.0 identity-service wallet_addresses 表字段
address_id: number;
user_id: number;
chain_type: string;
address: string;
public_key: string;
address_digest: string;
mpc_signature_r: string;
mpc_signature_s: string;
mpc_signature_v: number;
status: string;
bound_at: number; // timestamp in milliseconds
// Debezium ExtractNewRecordState 添加的元数据字段
__op: 'c' | 'u' | 'd' | 'r';
__table: string;
__source_ts_ms: number;
__deleted?: string;
}
/**
* CDC Consumer - 1.0
* Debezium CDC synced_wallet_addresses
*
* Transactional Idempotent Consumer
* - CDC exactly-once
* - processed_cdc_events
* -
*/
@Injectable()
export class WalletAddressCdcConsumer implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(WalletAddressCdcConsumer.name);
private kafka: Kafka;
private consumer: Consumer;
private isConnected = false;
private topic: string;
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
) {
const brokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(',');
this.kafka = new Kafka({
clientId: 'auth-service-cdc-wallet',
brokers,
});
this.consumer = this.kafka.consumer({
groupId: this.configService.get<string>('CDC_CONSUMER_GROUP', 'auth-service-cdc-group') + '-wallet',
});
this.topic = this.configService.get<string>(
'CDC_TOPIC_WALLET_ADDRESSES',
'cdc.identity.public.wallet_addresses',
);
}
async onModuleInit() {
if (this.configService.get('CDC_ENABLED', 'true') !== 'true') {
this.logger.log('Wallet Address CDC Consumer is disabled');
return;
}
try {
await this.consumer.connect();
this.isConnected = true;
await this.consumer.subscribe({ topic: this.topic, fromBeginning: true });
await this.consumer.run({
eachMessage: async (payload) => {
await this.handleMessage(payload);
},
});
this.logger.log(
`Wallet Address CDC Consumer started, listening to topic: ${this.topic}`,
);
} catch (error) {
this.logger.error('Failed to start Wallet Address CDC Consumer', error);
}
}
async onModuleDestroy() {
if (this.isConnected) {
await this.consumer.disconnect();
this.logger.log('Wallet Address CDC Consumer disconnected');
}
}
private async handleMessage(payload: EachMessagePayload) {
const { topic, partition, message } = payload;
if (!message.value) return;
const offset = BigInt(message.offset);
const idempotencyKey = `${topic}:${offset}`;
try {
const cdcEvent: UnwrappedCdcWalletAddress = JSON.parse(message.value.toString());
const op = cdcEvent.__op;
const tableName = cdcEvent.__table || 'wallet_addresses';
this.logger.log(`[CDC] Processing wallet address event: topic=${topic}, offset=${offset}, op=${op}`);
await this.processWithIdempotency(topic, offset, tableName, op, cdcEvent);
this.logger.log(`[CDC] Successfully processed wallet address event: ${idempotencyKey}`);
} catch (error: any) {
if (error.code === 'P2002') {
this.logger.debug(`[CDC] Skipping duplicate wallet address event: ${idempotencyKey}`);
return;
}
this.logger.error(
`[CDC] Failed to process wallet address message from ${topic}[${partition}], offset=${offset}`,
error,
);
}
}
/**
*
*/
private async processWithIdempotency(
topic: string,
offset: bigint,
tableName: string,
operation: string,
event: UnwrappedCdcWalletAddress,
): Promise<void> {
await this.prisma.$transaction(async (tx) => {
// 1. 尝试插入幂等记录
try {
await tx.processedCdcEvent.create({
data: {
sourceTopic: topic,
offset: offset,
tableName: tableName,
operation: operation,
},
});
} catch (error: any) {
if (error.code === 'P2002') {
this.logger.debug(`[CDC] Wallet address event already processed: ${topic}:${offset}`);
return;
}
throw error;
}
// 2. 执行业务逻辑
await this.processCdcEvent(event, offset, tx);
}, {
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
timeout: 30000,
});
}
private async processCdcEvent(
event: UnwrappedCdcWalletAddress,
sequenceNum: bigint,
tx: TransactionClient,
): Promise<void> {
const op = event.__op;
const isDeleted = event.__deleted === 'true';
if (isDeleted || op === 'd') {
await this.deleteWalletAddress(event.address_id, tx);
return;
}
switch (op) {
case 'c':
case 'r':
case 'u':
await this.upsertWalletAddress(event, sequenceNum, tx);
break;
}
}
private async upsertWalletAddress(
walletAddress: UnwrappedCdcWalletAddress,
sequenceNum: bigint,
tx: TransactionClient,
): Promise<void> {
await tx.syncedWalletAddress.upsert({
where: { legacyAddressId: BigInt(walletAddress.address_id) },
update: {
legacyUserId: BigInt(walletAddress.user_id),
chainType: walletAddress.chain_type,
address: walletAddress.address,
publicKey: walletAddress.public_key,
status: walletAddress.status,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
create: {
legacyAddressId: BigInt(walletAddress.address_id),
legacyUserId: BigInt(walletAddress.user_id),
chainType: walletAddress.chain_type,
address: walletAddress.address,
publicKey: walletAddress.public_key,
status: walletAddress.status,
legacyBoundAt: new Date(walletAddress.bound_at),
sourceSequenceNum: sequenceNum,
},
});
this.logger.debug(
`[CDC] Synced wallet address: addressId=${walletAddress.address_id}, chain=${walletAddress.chain_type}`,
);
}
private async deleteWalletAddress(addressId: number, tx: TransactionClient): Promise<void> {
try {
await tx.syncedWalletAddress.update({
where: { legacyAddressId: BigInt(addressId) },
data: { status: 'DELETED' },
});
this.logger.debug(`[CDC] Marked wallet address as deleted: ${addressId}`);
} catch (error) {
this.logger.error(`[CDC] Failed to mark wallet address as deleted: ${addressId}`, error);
}
}
}

View File

@ -112,6 +112,7 @@ services:
KAFKA_BROKERS: kafka:29092
# 2.0 内部服务调用
MINING_SERVICE_URL: http://mining-service:3021
AUTH_SERVICE_URL: http://auth-service:3024
# JWT 配置 (与 auth-service 共享密钥以验证 token)
JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production}
ports:
@ -188,6 +189,7 @@ services:
KAFKA_BROKERS: kafka:29092
CDC_ENABLED: "true"
CDC_TOPIC_USERS: ${CDC_TOPIC_USERS:-cdc.identity.public.user_accounts}
CDC_TOPIC_WALLET_ADDRESSES: ${CDC_TOPIC_WALLET_ADDRESSES:-cdc.identity.public.wallet_addresses}
CDC_CONSUMER_GROUP: auth-service-cdc-group
# JWT 配置
JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production}

View File

@ -12,7 +12,7 @@ export interface UserInfo {
}
/**
* Identity
* Auth auth-service API
* Kava
*/
@Injectable()
@ -25,25 +25,28 @@ export class IdentityClient {
private readonly configService: ConfigService,
) {
this.baseUrl = this.configService.get<string>(
'IDENTITY_SERVICE_URL',
'http://localhost:3001',
'AUTH_SERVICE_URL',
'http://localhost:3024',
);
this.logger.log(`[INIT] IdentityClient initialized with URL: ${this.baseUrl}`);
this.logger.log(`[INIT] IdentityClient initialized with auth-service URL: ${this.baseUrl}`);
}
/**
* Kava
* Kava auth-service API
* @param accountSequence
*/
async getUserKavaAddress(accountSequence: string): Promise<string | null> {
try {
const response: AxiosResponse<{ kavaAddress?: string }> = await firstValueFrom(
this.httpService.get<{ kavaAddress?: string }>(
`${this.baseUrl}/api/v1/internal/user/${accountSequence}/kava-address`,
),
);
// auth-service TransformInterceptor 会包装响应为 { success, data: { kavaAddress }, timestamp }
const response: AxiosResponse<{ data?: { kavaAddress?: string }; kavaAddress?: string }> =
await firstValueFrom(
this.httpService.get(
`${this.baseUrl}/api/v2/internal/users/${accountSequence}/kava-address`,
),
);
return response.data?.kavaAddress || null;
const body = response.data;
return body?.data?.kavaAddress || body?.kavaAddress || null;
} catch (error: any) {
this.logger.error(`Failed to get Kava address for ${accountSequence}: ${error.message}`);
return null;
@ -56,13 +59,15 @@ export class IdentityClient {
*/
async getUserInfo(accountSequence: string): Promise<UserInfo | null> {
try {
const response: AxiosResponse<UserInfo> = await firstValueFrom(
this.httpService.get<UserInfo>(
`${this.baseUrl}/api/v1/internal/user/${accountSequence}`,
// auth-service TransformInterceptor 会包装响应为 { success, data: {...}, timestamp }
const response: AxiosResponse<{ data?: UserInfo } & UserInfo> = await firstValueFrom(
this.httpService.get(
`${this.baseUrl}/api/v2/internal/users/${accountSequence}`,
),
);
return response.data;
const body = response.data;
return body?.data || body || null;
} catch (error: any) {
this.logger.error(`Failed to get user info for ${accountSequence}: ${error.message}`);
return null;