diff --git a/backend/services/auth-service/prisma/migrations/0002_add_synced_wallet_addresses/migration.sql b/backend/services/auth-service/prisma/migrations/0002_add_synced_wallet_addresses/migration.sql new file mode 100644 index 00000000..4fa2db6c --- /dev/null +++ b/backend/services/auth-service/prisma/migrations/0002_add_synced_wallet_addresses/migration.sql @@ -0,0 +1,27 @@ +-- CreateTable +CREATE TABLE "synced_wallet_addresses" ( + "id" BIGSERIAL NOT NULL, + "legacy_address_id" BIGINT NOT NULL, + "legacy_user_id" BIGINT NOT NULL, + "chain_type" TEXT NOT NULL, + "address" TEXT NOT NULL, + "public_key" TEXT NOT NULL, + "status" TEXT NOT NULL DEFAULT 'ACTIVE', + "legacy_bound_at" TIMESTAMP(3) NOT NULL, + "source_sequence_num" BIGINT NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "synced_wallet_addresses_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "synced_wallet_addresses_legacy_address_id_key" ON "synced_wallet_addresses"("legacy_address_id"); + +-- CreateIndex +CREATE UNIQUE INDEX "synced_wallet_addresses_legacy_user_id_chain_type_key" ON "synced_wallet_addresses"("legacy_user_id", "chain_type"); + +-- CreateIndex +CREATE INDEX "synced_wallet_addresses_legacy_user_id_idx" ON "synced_wallet_addresses"("legacy_user_id"); + +-- CreateIndex +CREATE INDEX "synced_wallet_addresses_chain_type_address_idx" ON "synced_wallet_addresses"("chain_type", "address"); diff --git a/backend/services/auth-service/prisma/schema.prisma b/backend/services/auth-service/prisma/schema.prisma index a883a798..6aab3018 100644 --- a/backend/services/auth-service/prisma/schema.prisma +++ b/backend/services/auth-service/prisma/schema.prisma @@ -104,6 +104,33 @@ model SyncedLegacyUser { @@map("synced_legacy_users") } +// ============================================================================ +// CDC 同步的 1.0 钱包地址(只读) +// ============================================================================ + +model SyncedWalletAddress { + id BigInt @id @default(autoincrement()) + + // 1.0 钱包地址数据 + legacyAddressId BigInt @unique @map("legacy_address_id") // 1.0 的 wallet_addresses.address_id + legacyUserId BigInt @map("legacy_user_id") // 1.0 的 wallet_addresses.user_id + chainType String @map("chain_type") // KAVA, BSC 等 + address String // 钱包地址 + publicKey String @map("public_key") // MPC 公钥 + status String @default("ACTIVE") // ACTIVE, DELETED + + legacyBoundAt DateTime @map("legacy_bound_at") // 1.0 绑定时间 + + // CDC 元数据 + sourceSequenceNum BigInt @map("source_sequence_num") + syncedAt DateTime @default(now()) @map("synced_at") + + @@unique([legacyUserId, chainType]) + @@index([legacyUserId]) + @@index([chainType, address]) + @@map("synced_wallet_addresses") +} + // ============================================================================ // 刷新令牌 // ============================================================================ diff --git a/backend/services/auth-service/src/api/api.module.ts b/backend/services/auth-service/src/api/api.module.ts index 5313ad41..290601c1 100644 --- a/backend/services/auth-service/src/api/api.module.ts +++ b/backend/services/auth-service/src/api/api.module.ts @@ -9,6 +9,7 @@ import { UserController, HealthController, AdminController, + InternalController, } from './controllers'; import { ApplicationModule } from '@/application'; import { JwtAuthGuard } from '@/shared/guards/jwt-auth.guard'; @@ -35,6 +36,7 @@ import { JwtAuthGuard } from '@/shared/guards/jwt-auth.guard'; UserController, HealthController, AdminController, + InternalController, ], providers: [JwtAuthGuard], }) diff --git a/backend/services/auth-service/src/api/controllers/index.ts b/backend/services/auth-service/src/api/controllers/index.ts index 11d91974..4b6ec35d 100644 --- a/backend/services/auth-service/src/api/controllers/index.ts +++ b/backend/services/auth-service/src/api/controllers/index.ts @@ -5,3 +5,4 @@ export * from './kyc.controller'; export * from './user.controller'; export * from './health.controller'; export * from './admin.controller'; +export * from './internal.controller'; diff --git a/backend/services/auth-service/src/api/controllers/internal.controller.ts b/backend/services/auth-service/src/api/controllers/internal.controller.ts new file mode 100644 index 00000000..369f9e27 --- /dev/null +++ b/backend/services/auth-service/src/api/controllers/internal.controller.ts @@ -0,0 +1,50 @@ +import { Controller, Get, Param, NotFoundException, Logger } from '@nestjs/common'; +import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; + +/** + * 内部 API - 供 2.0 服务间调用,不需要 JWT 认证 + */ +@Controller('internal') +export class InternalController { + private readonly logger = new Logger(InternalController.name); + + constructor(private readonly prisma: PrismaService) {} + + /** + * 根据 accountSequence 获取用户的 Kava 地址 + * trading-service 创建卖单时调用 + */ + @Get('users/:accountSequence/kava-address') + async getUserKavaAddress( + @Param('accountSequence') accountSequence: string, + ): Promise<{ kavaAddress: string }> { + // 1. 通过 SyncedLegacyUser 查找 legacyId + const legacyUser = await this.prisma.syncedLegacyUser.findUnique({ + where: { accountSequence }, + select: { legacyId: true }, + }); + + if (!legacyUser) { + this.logger.warn(`[Internal] Legacy user not found: ${accountSequence}`); + throw new NotFoundException(`用户未找到: ${accountSequence}`); + } + + // 2. 通过 legacyUserId + chainType 查找 KAVA 钱包地址 + const walletAddress = await this.prisma.syncedWalletAddress.findUnique({ + where: { + legacyUserId_chainType: { + legacyUserId: legacyUser.legacyId, + chainType: 'KAVA', + }, + }, + select: { address: true, status: true }, + }); + + if (!walletAddress || walletAddress.status !== 'ACTIVE') { + this.logger.warn(`[Internal] Kava address not found for: ${accountSequence}`); + throw new NotFoundException(`未找到 Kava 钱包地址: ${accountSequence}`); + } + + return { kavaAddress: walletAddress.address }; + } +} diff --git a/backend/services/auth-service/src/infrastructure/infrastructure.module.ts b/backend/services/auth-service/src/infrastructure/infrastructure.module.ts index ca842176..bc0a80f6 100644 --- a/backend/services/auth-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/auth-service/src/infrastructure/infrastructure.module.ts @@ -7,7 +7,7 @@ import { PrismaRefreshTokenRepository, PrismaSmsVerificationRepository, } from './persistence/repositories'; -import { LegacyUserCdcConsumer } from './messaging/cdc'; +import { LegacyUserCdcConsumer, WalletAddressCdcConsumer } from './messaging/cdc'; import { KafkaModule, KafkaProducerService } from './kafka'; import { RedisService } from './redis'; import { @@ -24,6 +24,7 @@ import { ApplicationModule } from '@/application/application.module'; providers: [ // CDC LegacyUserCdcConsumer, + WalletAddressCdcConsumer, // Kafka Producer KafkaProducerService, diff --git a/backend/services/auth-service/src/infrastructure/messaging/cdc/index.ts b/backend/services/auth-service/src/infrastructure/messaging/cdc/index.ts index 2239ca9a..bd95ea03 100644 --- a/backend/services/auth-service/src/infrastructure/messaging/cdc/index.ts +++ b/backend/services/auth-service/src/infrastructure/messaging/cdc/index.ts @@ -1 +1,2 @@ export * from './legacy-user-cdc.consumer'; +export * from './wallet-address-cdc.consumer'; diff --git a/backend/services/auth-service/src/infrastructure/messaging/cdc/wallet-address-cdc.consumer.ts b/backend/services/auth-service/src/infrastructure/messaging/cdc/wallet-address-cdc.consumer.ts new file mode 100644 index 00000000..5edbf1a6 --- /dev/null +++ b/backend/services/auth-service/src/infrastructure/messaging/cdc/wallet-address-cdc.consumer.ts @@ -0,0 +1,243 @@ +import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; +import { Prisma, PrismaClient } from '@prisma/client'; +import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; + +/** Prisma 事务客户端类型 */ +type TransactionClient = Omit< + PrismaClient, + '$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends' +>; + +/** + * ExtractNewRecordState 转换后的消息格式 + * 字段来自 identity-service 的 wallet_addresses 表 + Debezium 元数据 + */ +interface UnwrappedCdcWalletAddress { + // 1.0 identity-service wallet_addresses 表字段 + address_id: number; + user_id: number; + chain_type: string; + address: string; + public_key: string; + address_digest: string; + mpc_signature_r: string; + mpc_signature_s: string; + mpc_signature_v: number; + status: string; + bound_at: number; // timestamp in milliseconds + + // Debezium ExtractNewRecordState 添加的元数据字段 + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +/** + * CDC Consumer - 消费 1.0 钱包地址变更事件 + * 监听 Debezium 发送的 CDC 事件,同步到 synced_wallet_addresses 表 + * + * 实现事务性幂等消费(Transactional Idempotent Consumer)确保: + * - 每个 CDC 事件只处理一次(exactly-once 语义) + * - 幂等记录(processed_cdc_events)和业务逻辑在同一事务中执行 + * - 任何失败都会导致整个事务回滚 + */ +@Injectable() +export class WalletAddressCdcConsumer implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(WalletAddressCdcConsumer.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + private topic: string; + + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, + ) { + const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','); + + this.kafka = new Kafka({ + clientId: 'auth-service-cdc-wallet', + brokers, + }); + + this.consumer = this.kafka.consumer({ + groupId: this.configService.get('CDC_CONSUMER_GROUP', 'auth-service-cdc-group') + '-wallet', + }); + + this.topic = this.configService.get( + 'CDC_TOPIC_WALLET_ADDRESSES', + 'cdc.identity.public.wallet_addresses', + ); + } + + async onModuleInit() { + if (this.configService.get('CDC_ENABLED', 'true') !== 'true') { + this.logger.log('Wallet Address CDC Consumer is disabled'); + return; + } + + try { + await this.consumer.connect(); + this.isConnected = true; + + await this.consumer.subscribe({ topic: this.topic, fromBeginning: true }); + + await this.consumer.run({ + eachMessage: async (payload) => { + await this.handleMessage(payload); + }, + }); + + this.logger.log( + `Wallet Address CDC Consumer started, listening to topic: ${this.topic}`, + ); + } catch (error) { + this.logger.error('Failed to start Wallet Address CDC Consumer', error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('Wallet Address CDC Consumer disconnected'); + } + } + + private async handleMessage(payload: EachMessagePayload) { + const { topic, partition, message } = payload; + + if (!message.value) return; + + const offset = BigInt(message.offset); + const idempotencyKey = `${topic}:${offset}`; + + try { + const cdcEvent: UnwrappedCdcWalletAddress = JSON.parse(message.value.toString()); + const op = cdcEvent.__op; + const tableName = cdcEvent.__table || 'wallet_addresses'; + + this.logger.log(`[CDC] Processing wallet address event: topic=${topic}, offset=${offset}, op=${op}`); + + await this.processWithIdempotency(topic, offset, tableName, op, cdcEvent); + + this.logger.log(`[CDC] Successfully processed wallet address event: ${idempotencyKey}`); + } catch (error: any) { + if (error.code === 'P2002') { + this.logger.debug(`[CDC] Skipping duplicate wallet address event: ${idempotencyKey}`); + return; + } + this.logger.error( + `[CDC] Failed to process wallet address message from ${topic}[${partition}], offset=${offset}`, + error, + ); + } + } + + /** + * 事务性幂等处理 + */ + private async processWithIdempotency( + topic: string, + offset: bigint, + tableName: string, + operation: string, + event: UnwrappedCdcWalletAddress, + ): Promise { + await this.prisma.$transaction(async (tx) => { + // 1. 尝试插入幂等记录 + try { + await tx.processedCdcEvent.create({ + data: { + sourceTopic: topic, + offset: offset, + tableName: tableName, + operation: operation, + }, + }); + } catch (error: any) { + if (error.code === 'P2002') { + this.logger.debug(`[CDC] Wallet address event already processed: ${topic}:${offset}`); + return; + } + throw error; + } + + // 2. 执行业务逻辑 + await this.processCdcEvent(event, offset, tx); + }, { + isolationLevel: Prisma.TransactionIsolationLevel.Serializable, + timeout: 30000, + }); + } + + private async processCdcEvent( + event: UnwrappedCdcWalletAddress, + sequenceNum: bigint, + tx: TransactionClient, + ): Promise { + const op = event.__op; + const isDeleted = event.__deleted === 'true'; + + if (isDeleted || op === 'd') { + await this.deleteWalletAddress(event.address_id, tx); + return; + } + + switch (op) { + case 'c': + case 'r': + case 'u': + await this.upsertWalletAddress(event, sequenceNum, tx); + break; + } + } + + private async upsertWalletAddress( + walletAddress: UnwrappedCdcWalletAddress, + sequenceNum: bigint, + tx: TransactionClient, + ): Promise { + await tx.syncedWalletAddress.upsert({ + where: { legacyAddressId: BigInt(walletAddress.address_id) }, + update: { + legacyUserId: BigInt(walletAddress.user_id), + chainType: walletAddress.chain_type, + address: walletAddress.address, + publicKey: walletAddress.public_key, + status: walletAddress.status, + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, + create: { + legacyAddressId: BigInt(walletAddress.address_id), + legacyUserId: BigInt(walletAddress.user_id), + chainType: walletAddress.chain_type, + address: walletAddress.address, + publicKey: walletAddress.public_key, + status: walletAddress.status, + legacyBoundAt: new Date(walletAddress.bound_at), + sourceSequenceNum: sequenceNum, + }, + }); + + this.logger.debug( + `[CDC] Synced wallet address: addressId=${walletAddress.address_id}, chain=${walletAddress.chain_type}`, + ); + } + + private async deleteWalletAddress(addressId: number, tx: TransactionClient): Promise { + try { + await tx.syncedWalletAddress.update({ + where: { legacyAddressId: BigInt(addressId) }, + data: { status: 'DELETED' }, + }); + + this.logger.debug(`[CDC] Marked wallet address as deleted: ${addressId}`); + } catch (error) { + this.logger.error(`[CDC] Failed to mark wallet address as deleted: ${addressId}`, error); + } + } +} diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index c2b7cffa..f2dacae7 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -112,6 +112,7 @@ services: KAFKA_BROKERS: kafka:29092 # 2.0 内部服务调用 MINING_SERVICE_URL: http://mining-service:3021 + AUTH_SERVICE_URL: http://auth-service:3024 # JWT 配置 (与 auth-service 共享密钥以验证 token) JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production} ports: @@ -188,6 +189,7 @@ services: KAFKA_BROKERS: kafka:29092 CDC_ENABLED: "true" CDC_TOPIC_USERS: ${CDC_TOPIC_USERS:-cdc.identity.public.user_accounts} + CDC_TOPIC_WALLET_ADDRESSES: ${CDC_TOPIC_WALLET_ADDRESSES:-cdc.identity.public.wallet_addresses} CDC_CONSUMER_GROUP: auth-service-cdc-group # JWT 配置 JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production} diff --git a/backend/services/trading-service/src/infrastructure/identity/identity.client.ts b/backend/services/trading-service/src/infrastructure/identity/identity.client.ts index 59e99123..0065b3a9 100644 --- a/backend/services/trading-service/src/infrastructure/identity/identity.client.ts +++ b/backend/services/trading-service/src/infrastructure/identity/identity.client.ts @@ -12,7 +12,7 @@ export interface UserInfo { } /** - * Identity 服务客户端 + * Auth 服务客户端(通过 auth-service 内部 API 获取用户信息) * 用于获取用户信息,包括 Kava 地址 */ @Injectable() @@ -25,25 +25,28 @@ export class IdentityClient { private readonly configService: ConfigService, ) { this.baseUrl = this.configService.get( - 'IDENTITY_SERVICE_URL', - 'http://localhost:3001', + 'AUTH_SERVICE_URL', + 'http://localhost:3024', ); - this.logger.log(`[INIT] IdentityClient initialized with URL: ${this.baseUrl}`); + this.logger.log(`[INIT] IdentityClient initialized with auth-service URL: ${this.baseUrl}`); } /** - * 获取用户的 Kava 地址 + * 获取用户的 Kava 地址(调用 auth-service 内部 API) * @param accountSequence 用户账户序列号 */ async getUserKavaAddress(accountSequence: string): Promise { try { - const response: AxiosResponse<{ kavaAddress?: string }> = await firstValueFrom( - this.httpService.get<{ kavaAddress?: string }>( - `${this.baseUrl}/api/v1/internal/user/${accountSequence}/kava-address`, - ), - ); + // auth-service TransformInterceptor 会包装响应为 { success, data: { kavaAddress }, timestamp } + const response: AxiosResponse<{ data?: { kavaAddress?: string }; kavaAddress?: string }> = + await firstValueFrom( + this.httpService.get( + `${this.baseUrl}/api/v2/internal/users/${accountSequence}/kava-address`, + ), + ); - return response.data?.kavaAddress || null; + const body = response.data; + return body?.data?.kavaAddress || body?.kavaAddress || null; } catch (error: any) { this.logger.error(`Failed to get Kava address for ${accountSequence}: ${error.message}`); return null; @@ -56,13 +59,15 @@ export class IdentityClient { */ async getUserInfo(accountSequence: string): Promise { try { - const response: AxiosResponse = await firstValueFrom( - this.httpService.get( - `${this.baseUrl}/api/v1/internal/user/${accountSequence}`, + // auth-service TransformInterceptor 会包装响应为 { success, data: {...}, timestamp } + const response: AxiosResponse<{ data?: UserInfo } & UserInfo> = await firstValueFrom( + this.httpService.get( + `${this.baseUrl}/api/v2/internal/users/${accountSequence}`, ), ); - return response.data; + const body = response.data; + return body?.data || body || null; } catch (error: any) { this.logger.error(`Failed to get user info for ${accountSequence}: ${error.message}`); return null;