rwadurian/backend/services/planting-service/src/infrastructure/kafka/event-publisher.service.ts

372 lines
11 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Injectable, Inject, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { ConfigService } from '@nestjs/config';
import { DomainEvent } from '../../domain/events/domain-event.interface';
/**
* Kafka Topic 映射
*
* 消费者:
* - reward-service: 监听 contract.signed, contract.expired
* - authorization-service: 监听 planting-events
* - referral-service: 监听 planting.planting.created
*
* 注意planting.order.paid 不再使用,奖励分配改为由合同签署结果触发
*/
const EVENT_TOPIC_MAP: Record<string, string> = {
PlantingOrderCreated: 'planting.order.created',
ProvinceCityConfirmed: 'planting.order.province-city-confirmed',
PlantingOrderPaid: 'planting.order.paid',
FundsAllocated: 'planting.order.funds-allocated',
PoolInjected: 'planting.pool.injected',
MiningEnabled: 'planting.mining.enabled',
// 合同签署事件
ContractSigned: 'contract.signed',
ContractExpired: 'contract.expired',
};
/**
* 合同签署事件数据
*/
export interface ContractSigningEventData {
orderNo: string;
userId: string;
accountSequence: string;
treeCount: number;
totalAmount: number;
provinceCode: string;
cityCode: string;
signedAt?: string; // contract.signed
expiredAt?: string; // contract.expired
// [2026-02-26] 总部运营成本压力涨价(每棵树加价金额),归总部 (S0000000001)
priceSupplement?: number;
}
@Injectable()
export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(EventPublisherService.name);
private isConnected = false;
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
private readonly configService: ConfigService,
) {
const brokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092');
this.logger.log(`[INIT] EventPublisherService created, brokers: ${brokers}`);
}
async onModuleInit() {
this.logger.log('[KAFKA] Attempting to connect to Kafka...');
try {
await this.kafkaClient.connect();
this.isConnected = true;
this.logger.log('[KAFKA] Successfully connected to Kafka broker');
} catch (error) {
this.logger.error('[KAFKA] Failed to connect to Kafka:', error);
// 不抛出错误,允许服务启动(开发环境可能没有 Kafka
this.logger.warn('[KAFKA] Service will start without Kafka connection');
}
}
async onModuleDestroy() {
if (this.isConnected) {
this.logger.log('[KAFKA] Disconnecting from Kafka...');
await this.kafkaClient.close();
this.logger.log('[KAFKA] Disconnected from Kafka');
}
}
/**
* 发布单个领域事件
*/
async publish(event: DomainEvent): Promise<void> {
const topic = this.getTopicForEvent(event);
const message = this.formatMessage(event);
this.logger.debug(`[PUBLISH] Preparing to publish event:
- Type: ${event.type}
- Topic: ${topic}
- AggregateId: ${event.aggregateId}
- Data: ${JSON.stringify(event.data)}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping event ${event.type}`);
return;
}
try {
this.kafkaClient.emit(topic, message);
this.logger.log(`[PUBLISH] ✓ Event ${event.type} published to topic ${topic}`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish event ${event.type}:`, error);
throw error;
}
}
/**
* 批量发布领域事件
*/
async publishAll(events: readonly DomainEvent[]): Promise<void> {
this.logger.log(`[PUBLISH] Publishing ${events.length} events...`);
for (const event of events) {
await this.publish(event);
}
this.logger.log(`[PUBLISH] Finished publishing ${events.length} events`);
}
/**
* 发布到通用 planting-events topic (供 authorization-service 消费)
*/
async publishToPlantingEvents(event: DomainEvent): Promise<void> {
const eventType = this.mapEventTypeToPlantingEventType(event.type);
const message = {
key: event.aggregateId,
value: JSON.stringify({
eventType,
payload: event.data,
occurredAt: event.occurredAt.toISOString(),
}),
};
this.logger.debug(`[PUBLISH] Preparing planting-events message:
- Original Type: ${event.type}
- Mapped Type: ${eventType}
- Payload: ${JSON.stringify(event.data)}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting-events for ${event.type}`);
return;
}
try {
this.kafkaClient.emit('planting-events', message);
this.logger.log(`[PUBLISH] ✓ Event ${event.type} published to topic planting-events`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish to planting-events:`, error);
}
}
/**
* 发布原始格式消息到 planting-events topic
*
* 与 publishToPlantingEvents 不同,此方法不映射 eventType
* 直接使用传入的 eventType。用于预种合并签约后通知 wallet-service
* 触发 markUserAsPlanted + settleUserPendingRewards。
*
* wallet-service 消费者匹配: eventType === 'PlantingOrderPaid'
*/
async publishRawToPlantingEvents(
key: string,
eventType: string,
payload: Record<string, unknown>,
): Promise<void> {
const message = {
key,
value: JSON.stringify({
eventType,
payload,
occurredAt: new Date().toISOString(),
}),
};
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping raw planting-events for ${eventType}`);
return;
}
try {
this.kafkaClient.emit('planting-events', message);
this.logger.log(`[PUBLISH] ✓ Raw event ${eventType} published to topic planting-events`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish raw to planting-events:`, error);
}
}
/**
* 发布认种支付事件 (reward-service 消费格式)
*/
async publishPlantingOrderPaid(params: {
orderId: string;
userId: string;
treeCount: number;
provinceCode: string;
cityCode: string;
paidAt: Date;
}): Promise<void> {
const topic = 'planting.order.paid';
const message = {
key: params.orderId,
value: JSON.stringify({
orderId: params.orderId,
userId: params.userId,
treeCount: params.treeCount,
provinceCode: params.provinceCode,
cityCode: params.cityCode,
paidAt: params.paidAt.toISOString(),
}),
};
this.logger.debug(`[PUBLISH] Publishing PlantingOrderPaid for reward-service:
- OrderId: ${params.orderId}
- UserId: ${params.userId}
- TreeCount: ${params.treeCount}
- Province: ${params.provinceCode}
- City: ${params.cityCode}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting.order.paid`);
return;
}
try {
this.kafkaClient.emit(topic, message);
this.logger.log(`[PUBLISH] ✓ PlantingOrderPaid published for order ${params.orderId}`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish PlantingOrderPaid:`, error);
}
}
/**
* 发布认种创建事件 (referral-service 消费格式)
*/
async publishPlantingCreated(params: {
userId: string;
treeCount: number;
provinceCode: string;
cityCode: string;
}): Promise<void> {
const topic = 'planting.planting.created';
const message = {
key: params.userId,
value: JSON.stringify({
eventName: 'planting.created',
data: {
userId: params.userId,
treeCount: params.treeCount,
provinceCode: params.provinceCode,
cityCode: params.cityCode,
},
}),
};
this.logger.debug(`[PUBLISH] Publishing PlantingCreated for referral-service:
- UserId: ${params.userId}
- TreeCount: ${params.treeCount}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping planting.planting.created`);
return;
}
try {
this.kafkaClient.emit(topic, message);
this.logger.log(`[PUBLISH] ✓ PlantingCreated published for user ${params.userId}`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish PlantingCreated:`, error);
}
}
/**
* 发布合同签署完成事件 (reward-service 消费)
* 触发正常奖励分配(系统费用 + 用户权益)
*/
async publishContractSigned(data: ContractSigningEventData): Promise<void> {
const topic = 'contract.signed';
const message = {
key: data.orderNo,
value: JSON.stringify({
eventName: 'contract.signed',
data,
}),
};
this.logger.debug(`[PUBLISH] Publishing ContractSigned for reward-service:
- OrderNo: ${data.orderNo}
- UserId: ${data.userId}
- TreeCount: ${data.treeCount}
- SignedAt: ${data.signedAt}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping contract.signed`);
return;
}
try {
this.kafkaClient.emit(topic, message);
this.logger.log(`[PUBLISH] ✓ ContractSigned published for order ${data.orderNo}`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish ContractSigned:`, error);
throw error;
}
}
/**
* 发布合同超时未签署事件 (reward-service 消费)
* 触发系统账户奖励分配(系统费用正常 + 用户权益进系统默认账户)
*/
async publishContractExpired(data: ContractSigningEventData): Promise<void> {
const topic = 'contract.expired';
const message = {
key: data.orderNo,
value: JSON.stringify({
eventName: 'contract.expired',
data,
}),
};
this.logger.debug(`[PUBLISH] Publishing ContractExpired for reward-service:
- OrderNo: ${data.orderNo}
- UserId: ${data.userId}
- TreeCount: ${data.treeCount}
- ExpiredAt: ${data.expiredAt}`);
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping contract.expired`);
return;
}
try {
this.kafkaClient.emit(topic, message);
this.logger.log(`[PUBLISH] ✓ ContractExpired published for order ${data.orderNo}`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish ContractExpired:`, error);
throw error;
}
}
private getTopicForEvent(event: DomainEvent): string {
const topic = EVENT_TOPIC_MAP[event.type] || 'planting.events';
this.logger.debug(`[TOPIC] Mapped event type ${event.type} to topic ${topic}`);
return topic;
}
private formatMessage(event: DomainEvent) {
return {
key: event.aggregateId,
value: JSON.stringify({
eventId: `${event.aggregateId}-${event.occurredAt.getTime()}`,
eventType: event.type,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
occurredAt: event.occurredAt.toISOString(),
data: event.data,
}),
};
}
/**
* 映射到 authorization-service 期望的事件类型格式
*/
private mapEventTypeToPlantingEventType(eventType: string): string {
switch (eventType) {
case 'PlantingOrderCreated':
return 'planting.tree.planted';
case 'PlantingOrderPaid':
return 'planting.tree.planted';
default:
return `planting.${eventType.toLowerCase()}`;
}
}
}