Revert "fix(mining-admin): 修复 CDC 事件缺少 eventId 的问题"

This reverts commit fff56e8baa.
This commit is contained in:
hailin 2026-01-20 22:38:42 -08:00
parent fff56e8baa
commit 0a199ae3b5
2 changed files with 8 additions and 28 deletions

View File

@ -317,35 +317,23 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
* Debezium outbox 线
*/
private normalizeServiceEvent(data: any): Omit<ServiceEvent, 'sequenceNum' | 'sourceTopic'> {
// 尝试从多种可能的字段名获取事件 ID
// - contribution-service 的 outbox 表主键是 outbox_id
// - mining-wallet-service 等其他服务的 outbox 表主键是 id
// - 直接发布的事件可能使用 eventId 或 id
let eventId = data.outbox_id ?? data.id ?? data.eventId;
// 如果没有找到事件 ID使用 aggregateId + timestamp 生成一个
// 这可以确保幂等性仍然有效(相同的 aggregateId 在同一毫秒内只处理一次)
if (!eventId) {
const aggregateId = data.aggregateId ?? data.aggregate_id ?? 'unknown';
const timestamp = data.created_at ?? data.createdAt ?? Date.now();
eventId = `${aggregateId}-${typeof timestamp === 'string' ? new Date(timestamp).getTime() : timestamp}`;
this.logger.warn(`Event missing id, generated fallback: ${eventId}, eventType: ${data.eventType ?? data.event_type}`);
}
// 如果已经是驼峰格式,确保 id 字段存在
// 如果已经是驼峰格式,直接返回
if (data.eventType && data.aggregateType) {
return {
...data,
id: String(eventId),
};
return data;
}
// Debezium outbox 格式转换
// 原始格式:{ event_type, aggregate_type, aggregate_id, payload (JSON string), ... }
// 注意:不同服务的 outbox 表主键列名可能不同:
// - contribution-service: outbox_id (mapped from id in Prisma)
// - 其他服务可能使用: id
const payload = typeof data.payload === 'string'
? JSON.parse(data.payload)
: data.payload;
// 优先使用 outbox_idcontribution-service回退到 id
const eventId = data.outbox_id ?? data.id;
return {
id: String(eventId),
eventType: data.event_type,

View File

@ -51,14 +51,6 @@ export class CdcSyncService implements OnModuleInit {
*/
private withIdempotency(handler: TransactionalServiceEventHandler): ServiceEventHandler {
return async (event: ServiceEvent) => {
// 验证事件 ID 存在
if (!event.id) {
this.logger.error(
`Event missing id, skipping: eventType=${event.eventType}, topic=${event.sourceTopic}`,
);
return;
}
const idempotencyKey = `${event.sourceTopic}:${event.id}`;
try {