feat: add detailed debug logging for MPC Kafka event flow
- Add comprehensive [INIT], [CONNECT], [PUBLISH], [RECEIVE], [HANDLE] logs to identity-service and mpc-service Kafka services - Add KeygenStarted event for tracking keygen progress - Add MpcKeygenCompletedHandler to process keygen completion events - Fix topic routing for MPC events between services 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
20d82906f6
commit
23043d5d79
|
|
@ -8,6 +8,7 @@ import { RecoverByPhoneHandler } from './commands/recover-by-phone/recover-by-ph
|
|||
import { BindPhoneHandler } from './commands/bind-phone/bind-phone.handler';
|
||||
import { GetMyProfileHandler } from './queries/get-my-profile/get-my-profile.handler';
|
||||
import { GetMyDevicesHandler } from './queries/get-my-devices/get-my-devices.handler';
|
||||
import { MpcKeygenCompletedHandler } from './event-handlers/mpc-keygen-completed.handler';
|
||||
import { DomainModule } from '@/domain/domain.module';
|
||||
import { InfrastructureModule } from '@/infrastructure/infrastructure.module';
|
||||
|
||||
|
|
@ -23,6 +24,8 @@ import { InfrastructureModule } from '@/infrastructure/infrastructure.module';
|
|||
BindPhoneHandler,
|
||||
GetMyProfileHandler,
|
||||
GetMyDevicesHandler,
|
||||
// MPC Event Handlers
|
||||
MpcKeygenCompletedHandler,
|
||||
],
|
||||
exports: [
|
||||
UserApplicationService,
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
export * from './mpc-keygen-completed.handler';
|
||||
|
|
@ -0,0 +1,265 @@
|
|||
/**
|
||||
* MPC Keygen Event Handler
|
||||
*
|
||||
* Handles keygen events from mpc-service:
|
||||
* - KeygenStarted: Updates status in Redis
|
||||
* - KeygenCompleted: Derives wallet addresses and saves to user account
|
||||
* - SessionFailed: Logs error and updates status
|
||||
*/
|
||||
|
||||
import { Injectable, Inject, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { keccak256 } from 'ethers';
|
||||
import { UserAccountRepository, USER_ACCOUNT_REPOSITORY } from '@/domain/repositories/user-account.repository.interface';
|
||||
import { WalletAddress } from '@/domain/entities/wallet-address.entity';
|
||||
import { ChainType, UserId } from '@/domain/value-objects';
|
||||
import { RedisService } from '@/infrastructure/redis/redis.service';
|
||||
import {
|
||||
MpcEventConsumerService,
|
||||
KeygenStartedPayload,
|
||||
KeygenCompletedPayload,
|
||||
SessionFailedPayload,
|
||||
} from '@/infrastructure/kafka/mpc-event-consumer.service';
|
||||
|
||||
// Redis key prefix for keygen status
|
||||
const KEYGEN_STATUS_PREFIX = 'keygen:status:';
|
||||
const KEYGEN_STATUS_TTL = 60 * 60 * 24; // 24 hours
|
||||
|
||||
export type KeygenStatus = 'pending' | 'generating' | 'completed' | 'failed';
|
||||
|
||||
export interface KeygenStatusData {
|
||||
status: KeygenStatus;
|
||||
userId: string;
|
||||
mpcSessionId?: string;
|
||||
publicKey?: string;
|
||||
walletAddress?: string;
|
||||
errorMessage?: string;
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class MpcKeygenCompletedHandler implements OnModuleInit {
|
||||
private readonly logger = new Logger(MpcKeygenCompletedHandler.name);
|
||||
|
||||
constructor(
|
||||
@Inject(USER_ACCOUNT_REPOSITORY)
|
||||
private readonly userRepository: UserAccountRepository,
|
||||
private readonly redisService: RedisService,
|
||||
private readonly mpcEventConsumer: MpcEventConsumerService,
|
||||
) {}
|
||||
|
||||
async onModuleInit() {
|
||||
// 注册事件处理器
|
||||
this.mpcEventConsumer.onKeygenStarted(this.handleKeygenStarted.bind(this));
|
||||
this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this));
|
||||
this.mpcEventConsumer.onSessionFailed(this.handleSessionFailed.bind(this));
|
||||
this.logger.log('Registered MPC event handlers');
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 keygen 开始事件
|
||||
*
|
||||
* 更新 Redis 中的状态为 "generating"
|
||||
*/
|
||||
private async handleKeygenStarted(payload: KeygenStartedPayload): Promise<void> {
|
||||
const { userId, mpcSessionId } = payload;
|
||||
this.logger.log(`Keygen started: userId=${userId}, mpcSessionId=${mpcSessionId}`);
|
||||
|
||||
try {
|
||||
const statusData: KeygenStatusData = {
|
||||
status: 'generating',
|
||||
userId,
|
||||
mpcSessionId,
|
||||
updatedAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await this.redisService.set(
|
||||
`${KEYGEN_STATUS_PREFIX}${userId}`,
|
||||
JSON.stringify(statusData),
|
||||
KEYGEN_STATUS_TTL,
|
||||
);
|
||||
|
||||
this.logger.log(`Keygen status updated to 'generating' for user: ${userId}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to update keygen status: ${error}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 keygen 完成事件
|
||||
*
|
||||
* 从 mpc-service 收到公钥后:
|
||||
* 1. 解析用户信息
|
||||
* 2. 从公钥派生各链钱包地址
|
||||
* 3. 保存钱包地址到用户账户
|
||||
* 4. 更新 Redis 状态为 completed
|
||||
*/
|
||||
private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise<void> {
|
||||
const { publicKey, extraPayload } = payload;
|
||||
|
||||
if (!extraPayload?.userId) {
|
||||
this.logger.warn('KeygenCompleted event missing userId, skipping');
|
||||
return;
|
||||
}
|
||||
|
||||
const { userId, username } = extraPayload;
|
||||
this.logger.log(`Processing keygen completed: userId=${userId}, username=${username}`);
|
||||
|
||||
try {
|
||||
// 1. 查找用户账户
|
||||
const account = await this.userRepository.findById(UserId.create(userId));
|
||||
if (!account) {
|
||||
this.logger.error(`User not found: ${userId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 从公钥派生以太坊地址 (各链通用 EVM 地址)
|
||||
const walletAddress = this.deriveAddressFromPublicKey(publicKey);
|
||||
this.logger.log(`Derived wallet address: ${walletAddress}`);
|
||||
|
||||
// 3. 创建三条链的钱包地址
|
||||
const wallets: WalletAddress[] = [
|
||||
WalletAddress.create({ userId: account.userId, chainType: ChainType.KAVA, address: walletAddress }),
|
||||
WalletAddress.create({ userId: account.userId, chainType: ChainType.DST, address: walletAddress }),
|
||||
WalletAddress.create({ userId: account.userId, chainType: ChainType.BSC, address: walletAddress }),
|
||||
];
|
||||
|
||||
// 4. 保存钱包地址到用户账户
|
||||
await this.userRepository.saveWallets(account.userId, wallets);
|
||||
|
||||
// 5. 更新 Redis 状态为 completed
|
||||
const statusData: KeygenStatusData = {
|
||||
status: 'completed',
|
||||
userId,
|
||||
publicKey,
|
||||
walletAddress,
|
||||
updatedAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await this.redisService.set(
|
||||
`${KEYGEN_STATUS_PREFIX}${userId}`,
|
||||
JSON.stringify(statusData),
|
||||
KEYGEN_STATUS_TTL,
|
||||
);
|
||||
|
||||
this.logger.log(`Wallet addresses saved for user: ${userId}, address: ${walletAddress}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to process keygen completed: ${error}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 session 失败事件
|
||||
*
|
||||
* 当 keygen 失败时:
|
||||
* 1. 记录错误日志
|
||||
* 2. 更新 Redis 状态为 failed
|
||||
*/
|
||||
private async handleSessionFailed(payload: SessionFailedPayload): Promise<void> {
|
||||
const { sessionType, errorMessage, extraPayload } = payload;
|
||||
|
||||
// 只处理 keygen 失败
|
||||
if (sessionType !== 'keygen' && sessionType !== 'KEYGEN') {
|
||||
return;
|
||||
}
|
||||
|
||||
const userId = extraPayload?.userId || 'unknown';
|
||||
this.logger.error(`Keygen failed for user ${userId}: ${errorMessage}`);
|
||||
|
||||
try {
|
||||
// 更新 Redis 状态为 failed
|
||||
const statusData: KeygenStatusData = {
|
||||
status: 'failed',
|
||||
userId,
|
||||
errorMessage,
|
||||
updatedAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await this.redisService.set(
|
||||
`${KEYGEN_STATUS_PREFIX}${userId}`,
|
||||
JSON.stringify(statusData),
|
||||
KEYGEN_STATUS_TTL,
|
||||
);
|
||||
|
||||
this.logger.log(`Keygen status updated to 'failed' for user: ${userId}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to update keygen failed status: ${error}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从压缩公钥派生以太坊地址
|
||||
*
|
||||
* @param compressedPubKey 33字节压缩公钥 (hex string)
|
||||
* @returns 以太坊地址 (0x...)
|
||||
*/
|
||||
private deriveAddressFromPublicKey(compressedPubKey: string): string {
|
||||
// 移除 0x 前缀(如果有)
|
||||
const pubKeyHex = compressedPubKey.startsWith('0x')
|
||||
? compressedPubKey.slice(2)
|
||||
: compressedPubKey;
|
||||
|
||||
// 如果是压缩公钥 (33 bytes = 66 hex chars),需要解压
|
||||
let uncompressedPubKey: string;
|
||||
if (pubKeyHex.length === 66) {
|
||||
// 压缩公钥,需要解压
|
||||
uncompressedPubKey = this.decompressPublicKey(pubKeyHex);
|
||||
} else if (pubKeyHex.length === 128 || pubKeyHex.length === 130) {
|
||||
// 未压缩公钥 (带或不带 04 前缀)
|
||||
uncompressedPubKey = pubKeyHex.length === 130 ? pubKeyHex.slice(2) : pubKeyHex;
|
||||
} else {
|
||||
throw new Error(`Invalid public key length: ${pubKeyHex.length}`);
|
||||
}
|
||||
|
||||
// 对未压缩公钥进行 keccak256 哈希
|
||||
const hash = keccak256('0x' + uncompressedPubKey);
|
||||
// 取最后 20 字节作为地址
|
||||
return '0x' + hash.slice(-40);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解压 secp256k1 压缩公钥
|
||||
*/
|
||||
private decompressPublicKey(compressedHex: string): string {
|
||||
const prefix = parseInt(compressedHex.slice(0, 2), 16);
|
||||
const xHex = compressedHex.slice(2);
|
||||
const x = BigInt('0x' + xHex);
|
||||
|
||||
// secp256k1 curve parameters
|
||||
const p = BigInt('0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F');
|
||||
const a = BigInt(0);
|
||||
const b = BigInt(7);
|
||||
|
||||
// Calculate y^2 = x^3 + ax + b (mod p)
|
||||
const ySquared = (x ** 3n + a * x + b) % p;
|
||||
|
||||
// Calculate modular square root
|
||||
const y = this.modPow(ySquared, (p + 1n) / 4n, p);
|
||||
|
||||
// Choose correct y based on prefix (02 = even, 03 = odd)
|
||||
const isEven = y % 2n === 0n;
|
||||
const needEven = prefix === 0x02;
|
||||
const finalY = isEven === needEven ? y : p - y;
|
||||
|
||||
// Format as 64-char hex strings
|
||||
const xStr = x.toString(16).padStart(64, '0');
|
||||
const yStr = finalY.toString(16).padStart(64, '0');
|
||||
|
||||
return xStr + yStr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Modular exponentiation
|
||||
*/
|
||||
private modPow(base: bigint, exp: bigint, mod: bigint): bigint {
|
||||
let result = 1n;
|
||||
base = base % mod;
|
||||
while (exp > 0n) {
|
||||
if (exp % 2n === 1n) {
|
||||
result = (result * base) % mod;
|
||||
}
|
||||
exp = exp / 2n;
|
||||
base = (base * base) % mod;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
@ -172,3 +172,34 @@ export class UserAccountDeactivatedEvent extends DomainEvent {
|
|||
return 'UserAccountDeactivated';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* MPC 密钥生成请求事件
|
||||
* 用户创建账户后发布此事件,触发 MPC 服务生成钱包地址
|
||||
*
|
||||
* payload 格式需要与 mpc-service 的 KeygenRequestedPayload 匹配:
|
||||
* - sessionId: 唯一会话ID
|
||||
* - userId: 用户ID
|
||||
* - username: 用户名 (用于 mpc-system 标识)
|
||||
* - threshold: 签名阈值 (默认 2)
|
||||
* - totalParties: 总参与方数 (默认 3)
|
||||
* - requireDelegate: 是否需要委托分片 (默认 true)
|
||||
*/
|
||||
export class MpcKeygenRequestedEvent extends DomainEvent {
|
||||
constructor(
|
||||
public readonly payload: {
|
||||
sessionId: string;
|
||||
userId: string;
|
||||
username: string;
|
||||
threshold: number;
|
||||
totalParties: number;
|
||||
requireDelegate: boolean;
|
||||
},
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
get eventType(): string {
|
||||
return 'MpcKeygenRequested';
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { Kafka, Producer, Consumer, logLevel } from 'kafkajs';
|
||||
import { DomainEvent } from '@/domain/events';
|
||||
|
|
@ -13,7 +13,7 @@ export interface DomainEventMessage {
|
|||
payload: any;
|
||||
}
|
||||
|
||||
// 定义主题常量
|
||||
// 定义主题常量 - identity-service 发布的事件
|
||||
export const IDENTITY_TOPICS = {
|
||||
USER_ACCOUNT_CREATED: 'identity.UserAccountCreated',
|
||||
USER_ACCOUNT_AUTO_CREATED: 'identity.UserAccountAutoCreated',
|
||||
|
|
@ -30,28 +30,49 @@ export const IDENTITY_TOPICS = {
|
|||
USER_ACCOUNT_FROZEN: 'identity.UserAccountFrozen',
|
||||
ACCOUNT_FROZEN: 'identity.AccountFrozen',
|
||||
USER_ACCOUNT_DEACTIVATED: 'identity.UserAccountDeactivated',
|
||||
// MPC 请求发送到 mpc.* topic,让 mpc-service 消费
|
||||
MPC_KEYGEN_REQUESTED: 'mpc.KeygenRequested',
|
||||
MPC_SIGNING_REQUESTED: 'mpc.SigningRequested',
|
||||
} as const;
|
||||
|
||||
// 定义 identity-service 需要消费的 MPC 事件主题
|
||||
export const MPC_CONSUME_TOPICS = {
|
||||
KEYGEN_COMPLETED: 'mpc.KeygenCompleted',
|
||||
SESSION_FAILED: 'mpc.SessionFailed',
|
||||
} as const;
|
||||
|
||||
@Injectable()
|
||||
export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(EventPublisherService.name);
|
||||
private kafka: Kafka;
|
||||
private producer: Producer;
|
||||
|
||||
constructor(private readonly configService: ConfigService) {
|
||||
const brokers = (this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092')).split(',');
|
||||
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID', 'identity-service');
|
||||
|
||||
this.logger.log(`[INIT] Kafka EventPublisher initializing...`);
|
||||
this.logger.log(`[INIT] ClientId: ${clientId}`);
|
||||
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
|
||||
|
||||
this.kafka = new Kafka({
|
||||
clientId: this.configService.get<string>('KAFKA_CLIENT_ID', 'identity-service'),
|
||||
brokers: (this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092')).split(','),
|
||||
clientId,
|
||||
brokers,
|
||||
logLevel: logLevel.WARN,
|
||||
});
|
||||
this.producer = this.kafka.producer();
|
||||
}
|
||||
|
||||
async onModuleInit() {
|
||||
this.logger.log(`[CONNECT] Connecting Kafka producer...`);
|
||||
await this.producer.connect();
|
||||
this.logger.log(`[CONNECT] Kafka producer connected successfully`);
|
||||
}
|
||||
|
||||
async onModuleDestroy() {
|
||||
this.logger.log(`[DISCONNECT] Disconnecting Kafka producer...`);
|
||||
await this.producer.disconnect();
|
||||
this.logger.log(`[DISCONNECT] Kafka producer disconnected`);
|
||||
}
|
||||
|
||||
async publish(event: DomainEvent): Promise<void>;
|
||||
|
|
@ -61,6 +82,10 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
// 直接发布到指定 topic (用于重试场景)
|
||||
const topic = eventOrTopic;
|
||||
const msg = message!;
|
||||
|
||||
this.logger.log(`[PUBLISH] Publishing to topic: ${topic}`);
|
||||
this.logger.debug(`[PUBLISH] Message: ${JSON.stringify(msg)}`);
|
||||
|
||||
await this.producer.send({
|
||||
topic,
|
||||
messages: [
|
||||
|
|
@ -70,28 +95,60 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
},
|
||||
],
|
||||
});
|
||||
|
||||
this.logger.log(`[PUBLISH] Successfully published eventId=${msg.eventId} to ${topic}`);
|
||||
} else {
|
||||
// 从领域事件发布
|
||||
const event = eventOrTopic;
|
||||
const topic = this.getTopicForEvent(event);
|
||||
const payload = (event as any).payload;
|
||||
|
||||
this.logger.log(`[PUBLISH] Publishing event: type=${event.eventType}, topic=${topic}`);
|
||||
this.logger.log(`[PUBLISH] EventId: ${event.eventId}`);
|
||||
this.logger.debug(`[PUBLISH] Payload: ${JSON.stringify(payload)}`);
|
||||
|
||||
const messageValue = {
|
||||
eventId: event.eventId,
|
||||
eventType: event.eventType,
|
||||
occurredAt: event.occurredAt.toISOString(),
|
||||
aggregateId: (event as any).aggregateId || '',
|
||||
aggregateType: (event as any).aggregateType || 'UserAccount',
|
||||
payload,
|
||||
};
|
||||
|
||||
await this.producer.send({
|
||||
topic: `identity.${event.eventType}`,
|
||||
topic,
|
||||
messages: [
|
||||
{
|
||||
key: event.eventId,
|
||||
value: JSON.stringify({
|
||||
eventId: event.eventId,
|
||||
eventType: event.eventType,
|
||||
occurredAt: event.occurredAt.toISOString(),
|
||||
aggregateId: (event as any).aggregateId || '',
|
||||
aggregateType: (event as any).aggregateType || 'UserAccount',
|
||||
payload: (event as any).payload,
|
||||
}),
|
||||
value: JSON.stringify(messageValue),
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
this.logger.log(`[PUBLISH] Successfully published ${event.eventType} to ${topic}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据事件类型获取对应的 Kafka topic
|
||||
* MPC 相关事件发送到 mpc.* topic,其他事件发送到 identity.* topic
|
||||
*/
|
||||
private getTopicForEvent(event: DomainEvent): string {
|
||||
const eventType = event.eventType;
|
||||
|
||||
// MPC 相关事件使用 mpc.* 前缀
|
||||
if (eventType === 'MpcKeygenRequested') {
|
||||
return IDENTITY_TOPICS.MPC_KEYGEN_REQUESTED;
|
||||
}
|
||||
if (eventType === 'MpcSigningRequested') {
|
||||
return IDENTITY_TOPICS.MPC_SIGNING_REQUESTED;
|
||||
}
|
||||
|
||||
// 其他事件使用 identity.* 前缀
|
||||
return `identity.${eventType}`;
|
||||
}
|
||||
|
||||
async publishAll(events: DomainEvent[]): Promise<void> {
|
||||
for (const event of events) {
|
||||
await this.publish(event);
|
||||
|
|
|
|||
|
|
@ -9,13 +9,21 @@ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/commo
|
|||
import { ConfigService } from '@nestjs/config';
|
||||
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
|
||||
|
||||
// MPC Event Topics
|
||||
// MPC Event Topics (events from mpc-service)
|
||||
export const MPC_TOPICS = {
|
||||
KEYGEN_STARTED: 'mpc.KeygenStarted',
|
||||
KEYGEN_COMPLETED: 'mpc.KeygenCompleted',
|
||||
SIGNING_COMPLETED: 'mpc.SigningCompleted',
|
||||
SESSION_FAILED: 'mpc.SessionFailed',
|
||||
} as const;
|
||||
|
||||
export interface KeygenStartedPayload {
|
||||
sessionId: string;
|
||||
userId: string;
|
||||
username: string;
|
||||
mpcSessionId: string;
|
||||
}
|
||||
|
||||
export interface KeygenCompletedPayload {
|
||||
sessionId: string;
|
||||
partyId: string;
|
||||
|
|
@ -68,6 +76,7 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
private consumer: Consumer;
|
||||
private isConnected = false;
|
||||
|
||||
private keygenStartedHandler?: MpcEventHandler<KeygenStartedPayload>;
|
||||
private keygenCompletedHandler?: MpcEventHandler<KeygenCompletedPayload>;
|
||||
private signingCompletedHandler?: MpcEventHandler<SigningCompletedPayload>;
|
||||
private sessionFailedHandler?: MpcEventHandler<SessionFailedPayload>;
|
||||
|
|
@ -79,6 +88,12 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'identity-service';
|
||||
const groupId = 'identity-service-mpc-events';
|
||||
|
||||
this.logger.log(`[INIT] MPC Event Consumer initializing...`);
|
||||
this.logger.log(`[INIT] ClientId: ${clientId}`);
|
||||
this.logger.log(`[INIT] GroupId: ${groupId}`);
|
||||
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
|
||||
this.logger.log(`[INIT] Topics to subscribe: ${Object.values(MPC_TOPICS).join(', ')}`);
|
||||
|
||||
this.kafka = new Kafka({
|
||||
clientId,
|
||||
brokers,
|
||||
|
|
@ -96,18 +111,19 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
});
|
||||
|
||||
try {
|
||||
this.logger.log(`[CONNECT] Connecting MPC Event consumer...`);
|
||||
await this.consumer.connect();
|
||||
this.isConnected = true;
|
||||
this.logger.log('MPC Event Kafka consumer connected');
|
||||
this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`);
|
||||
|
||||
// Subscribe to MPC topics
|
||||
await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false });
|
||||
this.logger.log(`Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`);
|
||||
this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`);
|
||||
|
||||
// Start consuming
|
||||
await this.startConsuming();
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to connect MPC Event Kafka consumer', error);
|
||||
this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -118,6 +134,13 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register handler for keygen started events
|
||||
*/
|
||||
onKeygenStarted(handler: MpcEventHandler<KeygenStartedPayload>): void {
|
||||
this.keygenStartedHandler = handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register handler for keygen completed events
|
||||
*/
|
||||
|
|
@ -142,46 +165,77 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
private async startConsuming(): Promise<void> {
|
||||
await this.consumer.run({
|
||||
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
|
||||
const offset = message.offset;
|
||||
this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`);
|
||||
|
||||
try {
|
||||
const value = message.value?.toString();
|
||||
if (!value) {
|
||||
this.logger.warn('Empty message received');
|
||||
this.logger.warn(`[RECEIVE] Empty message received on ${topic}`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`);
|
||||
|
||||
const parsed = JSON.parse(value);
|
||||
const payload = parsed.payload || parsed;
|
||||
|
||||
this.logger.debug(`Received MPC event from ${topic}: ${JSON.stringify(payload)}`);
|
||||
this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`);
|
||||
this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`);
|
||||
|
||||
switch (topic) {
|
||||
case MPC_TOPICS.KEYGEN_STARTED:
|
||||
this.logger.log(`[HANDLE] Processing KeygenStarted event`);
|
||||
if (this.keygenStartedHandler) {
|
||||
await this.keygenStartedHandler(payload as KeygenStartedPayload);
|
||||
this.logger.log(`[HANDLE] KeygenStarted handler completed`);
|
||||
} else {
|
||||
this.logger.warn(`[HANDLE] No handler registered for KeygenStarted`);
|
||||
}
|
||||
break;
|
||||
|
||||
case MPC_TOPICS.KEYGEN_COMPLETED:
|
||||
this.logger.log(`[HANDLE] Processing KeygenCompleted event`);
|
||||
this.logger.log(`[HANDLE] publicKey: ${(payload as KeygenCompletedPayload).publicKey?.substring(0, 20)}...`);
|
||||
if (this.keygenCompletedHandler) {
|
||||
await this.keygenCompletedHandler(payload as KeygenCompletedPayload);
|
||||
this.logger.log(`[HANDLE] KeygenCompleted handler completed`);
|
||||
} else {
|
||||
this.logger.warn(`[HANDLE] No handler registered for KeygenCompleted`);
|
||||
}
|
||||
break;
|
||||
|
||||
case MPC_TOPICS.SIGNING_COMPLETED:
|
||||
this.logger.log(`[HANDLE] Processing SigningCompleted event`);
|
||||
if (this.signingCompletedHandler) {
|
||||
await this.signingCompletedHandler(payload as SigningCompletedPayload);
|
||||
this.logger.log(`[HANDLE] SigningCompleted handler completed`);
|
||||
} else {
|
||||
this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`);
|
||||
}
|
||||
break;
|
||||
|
||||
case MPC_TOPICS.SESSION_FAILED:
|
||||
this.logger.log(`[HANDLE] Processing SessionFailed event`);
|
||||
this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`);
|
||||
this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`);
|
||||
if (this.sessionFailedHandler) {
|
||||
await this.sessionFailedHandler(payload as SessionFailedPayload);
|
||||
this.logger.log(`[HANDLE] SessionFailed handler completed`);
|
||||
} else {
|
||||
this.logger.warn(`[HANDLE] No handler registered for SessionFailed`);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
this.logger.warn(`Unknown MPC topic: ${topic}`);
|
||||
this.logger.warn(`[RECEIVE] Unknown MPC topic: ${topic}`);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Error processing MPC event from ${topic}`, error);
|
||||
this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log('Started consuming MPC events');
|
||||
this.logger.log(`[START] Started consuming MPC events`);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
export * from './random-identity.util';
|
||||
|
|
@ -0,0 +1,158 @@
|
|||
/**
|
||||
* 随机用户名和头像生成器
|
||||
*/
|
||||
|
||||
// 形容词词库 (水果/美食主题)
|
||||
const ADJECTIVES = [
|
||||
'快乐', '阳光', '活泼', '可爱', '勇敢', '聪明', '温暖', '甜蜜',
|
||||
'闪亮', '酷炫', '神秘', '优雅', '热情', '淘气', '呆萌', '霸气',
|
||||
'清新', '软糯', '香甜', '金色', '紫色', '粉色', '蓝色', '绿色',
|
||||
];
|
||||
|
||||
// 名词词库 (水果主题 - 榴莲相关)
|
||||
const NOUNS = [
|
||||
'榴莲', '芒果', '椰子', '菠萝', '龙眼', '荔枝', '山竹', '木瓜',
|
||||
'百香果', '火龙果', '杨桃', '莲雾', '番石榴', '释迦', '红毛丹',
|
||||
'勇士', '战士', '骑士', '猎手', '侠客', '英雄', '达人', '玩家',
|
||||
];
|
||||
|
||||
// 生成随机用户名: 形容词 + 名词 + 随机数字
|
||||
export function generateRandomUsername(): string {
|
||||
const adjective = ADJECTIVES[Math.floor(Math.random() * ADJECTIVES.length)];
|
||||
const noun = NOUNS[Math.floor(Math.random() * NOUNS.length)];
|
||||
const number = Math.floor(Math.random() * 90000) + 10000; // 5位数字
|
||||
return `${adjective}${noun}_${number}`;
|
||||
}
|
||||
|
||||
// 预定义的柔和配色方案
|
||||
const COLOR_PALETTES = [
|
||||
{ bg: '#FFE4E1', primary: '#FF6B6B', secondary: '#4ECDC4' }, // 粉红+红+青
|
||||
{ bg: '#E8F5E9', primary: '#66BB6A', secondary: '#FFA726' }, // 浅绿+绿+橙
|
||||
{ bg: '#E3F2FD', primary: '#42A5F5', secondary: '#AB47BC' }, // 浅蓝+蓝+紫
|
||||
{ bg: '#FFF3E0', primary: '#FF7043', secondary: '#26A69A' }, // 浅橙+橙+青
|
||||
{ bg: '#F3E5F5', primary: '#AB47BC', secondary: '#42A5F5' }, // 浅紫+紫+蓝
|
||||
{ bg: '#FFFDE7', primary: '#FFCA28', secondary: '#EC407A' }, // 浅黄+黄+粉
|
||||
{ bg: '#E0F7FA', primary: '#26C6DA', secondary: '#7E57C2' }, // 浅青+青+紫
|
||||
{ bg: '#FCE4EC', primary: '#EC407A', secondary: '#66BB6A' }, // 浅粉+粉+绿
|
||||
];
|
||||
|
||||
// 榴莲形状变体
|
||||
const DURIAN_SHAPES = [
|
||||
// 经典榴莲
|
||||
(color: string) => `
|
||||
<ellipse cx="50" cy="55" rx="25" ry="20" fill="${color}"/>
|
||||
<path d="M25 55 Q15 45, 20 35 Q25 40, 30 35 Q35 40, 40 35 Q45 40, 50 35 Q55 40, 60 35 Q65 40, 70 35 Q75 40, 80 35 Q85 45, 75 55" fill="${color}"/>
|
||||
<path d="M35 25 Q40 15, 50 20 Q60 15, 65 25" stroke="${color}" stroke-width="3" fill="none"/>
|
||||
`,
|
||||
// 圆润榴莲
|
||||
(color: string) => `
|
||||
<circle cx="50" cy="50" r="22" fill="${color}"/>
|
||||
<circle cx="42" cy="42" r="5" fill="#FFFFFFAA"/>
|
||||
<path d="M30 40 Q25 30, 35 25" stroke="${color}" stroke-width="4" fill="none"/>
|
||||
<path d="M70 40 Q75 30, 65 25" stroke="${color}" stroke-width="4" fill="none"/>
|
||||
<path d="M50 30 Q50 20, 50 15" stroke="${color}" stroke-width="4" fill="none"/>
|
||||
`,
|
||||
// 可爱榴莲
|
||||
(color: string) => `
|
||||
<ellipse cx="50" cy="52" rx="22" ry="18" fill="${color}"/>
|
||||
<polygon points="35,35 40,20 45,35" fill="${color}"/>
|
||||
<polygon points="50,32 55,17 60,32" fill="${color}"/>
|
||||
<polygon points="65,35 70,20 75,35" fill="${color}"/>
|
||||
`,
|
||||
];
|
||||
|
||||
// 表情变体
|
||||
const FACE_EXPRESSIONS = [
|
||||
// 开心
|
||||
(x: number, y: number) => `
|
||||
<circle cx="${x - 8}" cy="${y}" r="3" fill="#333"/>
|
||||
<circle cx="${x + 8}" cy="${y}" r="3" fill="#333"/>
|
||||
<path d="M${x - 6} ${y + 8} Q${x} ${y + 14}, ${x + 6} ${y + 8}" stroke="#333" stroke-width="2" fill="none"/>
|
||||
`,
|
||||
// 眨眼
|
||||
(x: number, y: number) => `
|
||||
<circle cx="${x - 8}" cy="${y}" r="3" fill="#333"/>
|
||||
<line x1="${x + 5}" y1="${y}" x2="${x + 11}" y2="${y}" stroke="#333" stroke-width="2"/>
|
||||
<path d="M${x - 6} ${y + 8} Q${x} ${y + 14}, ${x + 6} ${y + 8}" stroke="#333" stroke-width="2" fill="none"/>
|
||||
`,
|
||||
// 惊讶
|
||||
(x: number, y: number) => `
|
||||
<circle cx="${x - 8}" cy="${y}" r="4" fill="#333"/>
|
||||
<circle cx="${x + 8}" cy="${y}" r="4" fill="#333"/>
|
||||
<ellipse cx="${x}" cy="${y + 10}" rx="4" ry="5" fill="#333"/>
|
||||
`,
|
||||
// 酷
|
||||
(x: number, y: number) => `
|
||||
<rect x="${x - 14}" y="${y - 4}" width="12" height="6" rx="2" fill="#333"/>
|
||||
<rect x="${x + 2}" y="${y - 4}" width="12" height="6" rx="2" fill="#333"/>
|
||||
<path d="M${x - 4} ${y + 8} L${x + 4} ${y + 8}" stroke="#333" stroke-width="2"/>
|
||||
`,
|
||||
// 害羞
|
||||
(x: number, y: number) => `
|
||||
<line x1="${x - 10}" y1="${y}" x2="${x - 5}" y2="${y}" stroke="#333" stroke-width="2"/>
|
||||
<line x1="${x + 5}" y1="${y}" x2="${x + 10}" y2="${y}" stroke="#333" stroke-width="2"/>
|
||||
<ellipse cx="${x - 12}" cy="${y + 5}" rx="4" ry="2" fill="#FFB6C1"/>
|
||||
<ellipse cx="${x + 12}" cy="${y + 5}" rx="4" ry="2" fill="#FFB6C1"/>
|
||||
<path d="M${x - 4} ${y + 10} Q${x} ${y + 13}, ${x + 4} ${y + 10}" stroke="#333" stroke-width="2" fill="none"/>
|
||||
`,
|
||||
];
|
||||
|
||||
// 装饰元素
|
||||
const DECORATIONS = [
|
||||
// 星星
|
||||
(color: string) => `
|
||||
<polygon points="15,15 17,20 22,20 18,24 20,29 15,26 10,29 12,24 8,20 13,20" fill="${color}"/>
|
||||
`,
|
||||
// 爱心
|
||||
(color: string) => `
|
||||
<path d="M85 20 C82 15, 75 15, 75 22 C75 15, 68 15, 65 20 C65 27, 75 35, 75 35 C75 35, 85 27, 85 20" fill="${color}"/>
|
||||
`,
|
||||
// 音符
|
||||
(color: string) => `
|
||||
<circle cx="18" cy="78" r="4" fill="${color}"/>
|
||||
<line x1="22" y1="78" x2="22" y2="65" stroke="${color}" stroke-width="2"/>
|
||||
<path d="M22 65 Q28 62, 28 68" stroke="${color}" stroke-width="2" fill="none"/>
|
||||
`,
|
||||
// 闪光
|
||||
(color: string) => `
|
||||
<line x1="80" y1="70" x2="85" y2="75" stroke="${color}" stroke-width="2"/>
|
||||
<line x1="85" y1="70" x2="80" y2="75" stroke="${color}" stroke-width="2"/>
|
||||
<line x1="78" y1="80" x2="82" y2="84" stroke="${color}" stroke-width="1.5"/>
|
||||
<line x1="82" y1="80" x2="78" y2="84" stroke="${color}" stroke-width="1.5"/>
|
||||
`,
|
||||
// 无装饰
|
||||
() => '',
|
||||
];
|
||||
|
||||
/**
|
||||
* 生成随机SVG头像 (榴莲主题)
|
||||
*/
|
||||
export function generateRandomAvatarSvg(): string {
|
||||
// 随机选择配色
|
||||
const palette = COLOR_PALETTES[Math.floor(Math.random() * COLOR_PALETTES.length)];
|
||||
// 随机选择榴莲形状
|
||||
const shape = DURIAN_SHAPES[Math.floor(Math.random() * DURIAN_SHAPES.length)];
|
||||
// 随机选择表情
|
||||
const face = FACE_EXPRESSIONS[Math.floor(Math.random() * FACE_EXPRESSIONS.length)];
|
||||
// 随机选择装饰 (50%概率有装饰)
|
||||
const decoration = Math.random() > 0.5
|
||||
? DECORATIONS[Math.floor(Math.random() * (DECORATIONS.length - 1))]
|
||||
: DECORATIONS[DECORATIONS.length - 1];
|
||||
|
||||
return `<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 100 100" width="100" height="100">
|
||||
<rect width="100" height="100" fill="${palette.bg}"/>
|
||||
${shape(palette.primary)}
|
||||
${face(50, 52)}
|
||||
${decoration(palette.secondary)}
|
||||
</svg>`;
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成完整的随机身份
|
||||
*/
|
||||
export function generateRandomIdentity(): { username: string; avatarSvg: string } {
|
||||
return {
|
||||
username: generateRandomUsername(),
|
||||
avatarSvg: generateRandomAvatarSvg(),
|
||||
};
|
||||
}
|
||||
|
|
@ -13,6 +13,7 @@ import {
|
|||
MPC_CONSUME_TOPICS,
|
||||
KeygenRequestedPayload,
|
||||
} from '../../infrastructure/messaging/kafka/event-consumer.service';
|
||||
import { KeygenStartedEvent } from '../../domain/events/keygen-started.event';
|
||||
import { KeygenCompletedEvent } from '../../domain/events/keygen-completed.event';
|
||||
import { SessionFailedEvent } from '../../domain/events/session-failed.event';
|
||||
import { SessionType } from '../../domain/enums';
|
||||
|
|
@ -28,18 +29,24 @@ export class KeygenRequestedHandler implements OnModuleInit {
|
|||
) {}
|
||||
|
||||
async onModuleInit() {
|
||||
this.logger.log(`[INIT] KeygenRequestedHandler initializing...`);
|
||||
await this.eventConsumer.subscribe(
|
||||
MPC_CONSUME_TOPICS.KEYGEN_REQUESTED,
|
||||
this.handleMessage.bind(this),
|
||||
);
|
||||
this.logger.log(`Subscribed to ${MPC_CONSUME_TOPICS.KEYGEN_REQUESTED}`);
|
||||
this.logger.log(`[INIT] Subscribed to ${MPC_CONSUME_TOPICS.KEYGEN_REQUESTED}`);
|
||||
}
|
||||
|
||||
private async handleMessage(topic: string, payload: Record<string, unknown>): Promise<void> {
|
||||
this.logger.log(`[HANDLE] Received keygen request from topic: ${topic}`);
|
||||
this.logger.log(`[HANDLE] Payload: ${JSON.stringify(payload)}`);
|
||||
|
||||
const data = payload as unknown as KeygenRequestedPayload;
|
||||
const { sessionId, userId, username, threshold, totalParties, requireDelegate } = data;
|
||||
|
||||
this.logger.log(`Processing keygen request: userId=${userId}, username=${username}, sessionId=${sessionId}`);
|
||||
this.logger.log(`[HANDLE] Parsed request: sessionId=${sessionId}`);
|
||||
this.logger.log(`[HANDLE] userId=${userId}, username=${username}`);
|
||||
this.logger.log(`[HANDLE] threshold=${threshold}, totalParties=${totalParties}, requireDelegate=${requireDelegate}`);
|
||||
|
||||
try {
|
||||
// Step 1: Create keygen session via mpc-system
|
||||
|
|
@ -53,6 +60,11 @@ export class KeygenRequestedHandler implements OnModuleInit {
|
|||
const mpcSessionId = createResult.sessionId;
|
||||
this.logger.log(`Keygen session created in mpc-system: ${mpcSessionId}`);
|
||||
|
||||
// Step 1.5: Publish KeygenStarted event to notify identity-service
|
||||
const startedEvent = new KeygenStartedEvent(sessionId, userId, username, mpcSessionId);
|
||||
await this.eventPublisher.publish(startedEvent);
|
||||
this.logger.log(`Published KeygenStarted event: userId=${userId}, mpcSessionId=${mpcSessionId}`);
|
||||
|
||||
// Step 2: Poll for completion (with max retries)
|
||||
const result = await this.pollKeygenCompletion(mpcSessionId, 150, 2000);
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ export { ShareRevokedEvent } from './share-revoked.event';
|
|||
export { ShareUsedEvent } from './share-used.event';
|
||||
|
||||
// Session Events
|
||||
export { KeygenStartedEvent } from './keygen-started.event';
|
||||
export { KeygenCompletedEvent } from './keygen-completed.event';
|
||||
export { SigningCompletedEvent } from './signing-completed.event';
|
||||
export { SessionFailedEvent } from './session-failed.event';
|
||||
|
|
@ -29,6 +30,7 @@ import { ShareCreatedEvent } from './share-created.event';
|
|||
import { ShareRotatedEvent } from './share-rotated.event';
|
||||
import { ShareRevokedEvent } from './share-revoked.event';
|
||||
import { ShareUsedEvent } from './share-used.event';
|
||||
import { KeygenStartedEvent } from './keygen-started.event';
|
||||
import { KeygenCompletedEvent } from './keygen-completed.event';
|
||||
import { SigningCompletedEvent } from './signing-completed.event';
|
||||
import { SessionFailedEvent } from './session-failed.event';
|
||||
|
|
@ -41,6 +43,7 @@ export type MPCDomainEvent =
|
|||
| ShareRotatedEvent
|
||||
| ShareRevokedEvent
|
||||
| ShareUsedEvent
|
||||
| KeygenStartedEvent
|
||||
| KeygenCompletedEvent
|
||||
| SigningCompletedEvent
|
||||
| SessionFailedEvent
|
||||
|
|
@ -54,6 +57,7 @@ export const MPC_TOPICS = {
|
|||
SHARE_ROTATED: 'mpc.ShareRotated',
|
||||
SHARE_REVOKED: 'mpc.ShareRevoked',
|
||||
SHARE_USED: 'mpc.ShareUsed',
|
||||
KEYGEN_STARTED: 'mpc.KeygenStarted',
|
||||
KEYGEN_COMPLETED: 'mpc.KeygenCompleted',
|
||||
SIGNING_COMPLETED: 'mpc.SigningCompleted',
|
||||
SESSION_FAILED: 'mpc.SessionFailed',
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* KeygenStarted Event
|
||||
*
|
||||
* Emitted when a keygen session starts processing.
|
||||
* Used to notify identity-service that keygen is in progress.
|
||||
*/
|
||||
|
||||
import { DomainEvent } from './domain-event.base';
|
||||
|
||||
export class KeygenStartedEvent extends DomainEvent {
|
||||
constructor(
|
||||
public readonly sessionId: string,
|
||||
public readonly userId: string,
|
||||
public readonly username: string,
|
||||
public readonly mpcSessionId: string,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
get eventType(): string {
|
||||
return 'KeygenStarted';
|
||||
}
|
||||
|
||||
get aggregateId(): string {
|
||||
return this.sessionId;
|
||||
}
|
||||
|
||||
get aggregateType(): string {
|
||||
return 'PartySession';
|
||||
}
|
||||
|
||||
get payload(): Record<string, unknown> {
|
||||
return {
|
||||
sessionId: this.sessionId,
|
||||
userId: this.userId,
|
||||
username: this.username,
|
||||
mpcSessionId: this.mpcSessionId,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -49,6 +49,11 @@ export class EventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'mpc-service';
|
||||
const groupId = this.configService.get<string>('KAFKA_GROUP_ID') || 'mpc-service-group';
|
||||
|
||||
this.logger.log(`[INIT] MPC Event Consumer initializing...`);
|
||||
this.logger.log(`[INIT] ClientId: ${clientId}`);
|
||||
this.logger.log(`[INIT] GroupId: ${groupId}`);
|
||||
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
|
||||
|
||||
this.kafka = new Kafka({
|
||||
clientId,
|
||||
brokers,
|
||||
|
|
@ -66,11 +71,12 @@ export class EventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
});
|
||||
|
||||
try {
|
||||
this.logger.log(`[CONNECT] Connecting Kafka consumer...`);
|
||||
await this.consumer.connect();
|
||||
this.isConnected = true;
|
||||
this.logger.log('Kafka consumer connected');
|
||||
this.logger.log(`[CONNECT] Kafka consumer connected successfully`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to connect Kafka consumer', error);
|
||||
this.logger.error(`[ERROR] Failed to connect Kafka consumer`, error);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -86,17 +92,18 @@ export class EventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
*/
|
||||
async subscribe(topic: string, handler: MessageHandler): Promise<void> {
|
||||
if (!this.isConnected) {
|
||||
this.logger.warn('Kafka not connected, cannot subscribe');
|
||||
this.logger.warn(`[SUBSCRIBE] Kafka not connected, cannot subscribe to ${topic}`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.handlers.set(topic, handler);
|
||||
this.logger.log(`[SUBSCRIBE] Registering handler for topic: ${topic}`);
|
||||
|
||||
try {
|
||||
await this.consumer.subscribe({ topic, fromBeginning: false });
|
||||
this.logger.log(`Subscribed to topic: ${topic}`);
|
||||
this.logger.log(`[SUBSCRIBE] Successfully subscribed to topic: ${topic}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to subscribe to topic: ${topic}`, error);
|
||||
this.logger.error(`[ERROR] Failed to subscribe to topic: ${topic}`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -106,35 +113,48 @@ export class EventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
*/
|
||||
async startConsuming(): Promise<void> {
|
||||
if (!this.isConnected) {
|
||||
this.logger.warn('Kafka not connected, cannot start consuming');
|
||||
this.logger.warn(`[START] Kafka not connected, cannot start consuming`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.log(`[START] Starting message consumption...`);
|
||||
this.logger.log(`[START] Registered handlers for topics: ${Array.from(this.handlers.keys()).join(', ')}`);
|
||||
|
||||
await this.consumer.run({
|
||||
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
|
||||
const offset = message.offset;
|
||||
this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`);
|
||||
|
||||
const handler = this.handlers.get(topic);
|
||||
if (!handler) {
|
||||
this.logger.warn(`No handler for topic: ${topic}`);
|
||||
this.logger.warn(`[RECEIVE] No handler registered for topic: ${topic}`);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const value = message.value?.toString();
|
||||
if (!value) {
|
||||
this.logger.warn('Empty message received');
|
||||
this.logger.warn(`[RECEIVE] Empty message received on ${topic}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const parsed = JSON.parse(value);
|
||||
this.logger.debug(`Received message from ${topic}: ${JSON.stringify(parsed)}`);
|
||||
this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`);
|
||||
|
||||
await handler(topic, parsed.payload || parsed);
|
||||
const parsed = JSON.parse(value);
|
||||
const payload = parsed.payload || parsed;
|
||||
|
||||
this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`);
|
||||
this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`);
|
||||
|
||||
this.logger.log(`[HANDLE] Invoking handler for ${topic}...`);
|
||||
await handler(topic, payload);
|
||||
this.logger.log(`[HANDLE] Handler completed for ${topic}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Error processing message from ${topic}`, error);
|
||||
this.logger.error(`[ERROR] Error processing message from ${topic}`, error);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log('Started consuming messages');
|
||||
this.logger.log(`[START] Started consuming messages successfully`);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,10 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
|
||||
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'mpc-party-service';
|
||||
|
||||
this.logger.log(`[INIT] MPC Event Publisher initializing...`);
|
||||
this.logger.log(`[INIT] ClientId: ${clientId}`);
|
||||
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
|
||||
|
||||
this.kafka = new Kafka({
|
||||
clientId,
|
||||
brokers,
|
||||
|
|
@ -38,11 +42,12 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
});
|
||||
|
||||
try {
|
||||
this.logger.log(`[CONNECT] Connecting Kafka producer...`);
|
||||
await this.producer.connect();
|
||||
this.isConnected = true;
|
||||
this.logger.log('Kafka producer connected');
|
||||
this.logger.log(`[CONNECT] Kafka producer connected successfully`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to connect Kafka producer', error);
|
||||
this.logger.error(`[ERROR] Failed to connect Kafka producer`, error);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -58,21 +63,30 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
*/
|
||||
async publish(event: DomainEvent): Promise<void> {
|
||||
if (!this.isConnected) {
|
||||
this.logger.warn('Kafka not connected, skipping event publish');
|
||||
this.logger.warn(`[PUBLISH] Kafka not connected, skipping event publish for ${event.eventType}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const topic = this.getTopicForEvent(event);
|
||||
|
||||
this.logger.log(`[PUBLISH] Publishing event: type=${event.eventType}, topic=${topic}`);
|
||||
this.logger.log(`[PUBLISH] EventId: ${event.eventId}`);
|
||||
this.logger.log(`[PUBLISH] AggregateId: ${event.aggregateId}`);
|
||||
|
||||
const messageValue = {
|
||||
eventId: event.eventId,
|
||||
eventType: event.eventType,
|
||||
occurredAt: event.occurredAt.toISOString(),
|
||||
aggregateId: event.aggregateId,
|
||||
aggregateType: event.aggregateType,
|
||||
payload: event.payload,
|
||||
};
|
||||
|
||||
this.logger.log(`[PUBLISH] Payload keys: ${Object.keys(event.payload).join(', ')}`);
|
||||
|
||||
const message = {
|
||||
key: event.eventId,
|
||||
value: JSON.stringify({
|
||||
eventId: event.eventId,
|
||||
eventType: event.eventType,
|
||||
occurredAt: event.occurredAt.toISOString(),
|
||||
aggregateId: event.aggregateId,
|
||||
aggregateType: event.aggregateType,
|
||||
payload: event.payload,
|
||||
}),
|
||||
value: JSON.stringify(messageValue),
|
||||
headers: {
|
||||
eventType: event.eventType,
|
||||
aggregateType: event.aggregateType,
|
||||
|
|
@ -85,9 +99,9 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
topic,
|
||||
messages: [message],
|
||||
});
|
||||
this.logger.debug(`Published event: ${event.eventType} to ${topic}`);
|
||||
this.logger.log(`[PUBLISH] Successfully published ${event.eventType} to ${topic}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to publish event: ${event.eventType}`, error);
|
||||
this.logger.error(`[ERROR] Failed to publish event: ${event.eventType}`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -105,23 +119,28 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
* Publish with retry logic
|
||||
*/
|
||||
async publishWithRetry(event: DomainEvent, maxRetries = 3): Promise<void> {
|
||||
this.logger.log(`[RETRY] Publishing with retry: ${event.eventType}, maxRetries=${maxRetries}`);
|
||||
let lastError: Error | undefined;
|
||||
|
||||
for (let attempt = 1; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
this.logger.log(`[RETRY] Attempt ${attempt}/${maxRetries} for ${event.eventType}`);
|
||||
await this.publish(event);
|
||||
this.logger.log(`[RETRY] Successfully published on attempt ${attempt}`);
|
||||
return;
|
||||
} catch (error) {
|
||||
lastError = error instanceof Error ? error : new Error(String(error));
|
||||
this.logger.warn(`Publish attempt ${attempt} failed: ${lastError.message}`);
|
||||
this.logger.warn(`[RETRY] Attempt ${attempt} failed: ${lastError.message}`);
|
||||
|
||||
if (attempt < maxRetries) {
|
||||
const delay = Math.pow(2, attempt) * 100; // Exponential backoff
|
||||
this.logger.log(`[RETRY] Waiting ${delay}ms before next attempt...`);
|
||||
await new Promise(resolve => setTimeout(resolve, delay));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.error(`[RETRY] All ${maxRetries} attempts failed for ${event.eventType}`);
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
|
|
@ -131,6 +150,7 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
ShareRotated: MPC_TOPICS.SHARE_ROTATED,
|
||||
ShareRevoked: MPC_TOPICS.SHARE_REVOKED,
|
||||
ShareUsed: MPC_TOPICS.SHARE_USED,
|
||||
KeygenStarted: MPC_TOPICS.KEYGEN_STARTED,
|
||||
KeygenCompleted: MPC_TOPICS.KEYGEN_COMPLETED,
|
||||
SigningCompleted: MPC_TOPICS.SIGNING_COMPLETED,
|
||||
SessionFailed: MPC_TOPICS.SESSION_FAILED,
|
||||
|
|
|
|||
Loading…
Reference in New Issue