fix(wallet): 池余额不足时跳过 Kafka 消息,避免无限重试

BurnConsumer 和 MiningDistributionConsumer 在池余额为 0 时抛出
Insufficient pool balance 错误导致 offset 不提交,造成无限重试。
现改为记录 warn 日志并标记事件为已处理(带 :skipped:insufficient_balance 标记)。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-03 03:28:34 -08:00
parent 8cfd107a92
commit 38ee808239
2 changed files with 24 additions and 0 deletions

View File

@ -97,6 +97,14 @@ export class BurnConsumer implements OnModuleInit {
`Minute burn processed: amount=${burnAmount.toFixed(8)}, minute=${payload.burnMinute}`,
);
} catch (error) {
// 余额不足时不重试,记录警告并跳过(避免无限重试)
if (error instanceof Error && error.message.includes('Insufficient pool balance')) {
this.logger.warn(
`Insufficient SHARE_POOL_A balance for minute burn ${eventId}, skipping. ${error.message}`,
);
await this.markEventProcessed(eventId, 'burn.minute-executed:skipped:insufficient_balance');
return;
}
this.logger.error(
`Failed to process minute burn event ${eventId}`,
error instanceof Error ? error.stack : error,
@ -145,6 +153,14 @@ export class BurnConsumer implements OnModuleInit {
`Sell burn processed: amount=${burnAmount.toFixed(8)}, account=${payload.sourceAccountSeq}`,
);
} catch (error) {
// 余额不足时不重试,记录警告并跳过(避免无限重试)
if (error instanceof Error && error.message.includes('Insufficient pool balance')) {
this.logger.warn(
`Insufficient SHARE_POOL_A balance for sell burn ${eventId}, skipping. ${error.message}`,
);
await this.markEventProcessed(eventId, 'burn.executed:skipped:insufficient_balance');
return;
}
this.logger.error(
`Failed to process sell burn event ${eventId}`,
error instanceof Error ? error.stack : error,

View File

@ -63,6 +63,14 @@ export class MiningDistributionConsumer implements OnModuleInit {
`Mining distribution processed: minute=${event.payload.miningMinute}, amount=${event.payload.totalDistributed}`,
);
} catch (error) {
// 余额不足时不重试,记录警告并跳过(避免无限重试)
if (error instanceof Error && error.message.includes('Insufficient pool balance')) {
this.logger.warn(
`Insufficient SHARE_POOL_B balance for mining distribution ${eventId}, skipping. ${error.message}`,
);
await this.markEventProcessed(eventId, 'mining.distribution:skipped:insufficient_balance');
return;
}
this.logger.error(
`Failed to process mining distribution for minute ${event.payload.miningMinute}`,
error instanceof Error ? error.stack : error,