From 38ee808239e2811b2afea329f67236d3eb508213 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 3 Feb 2026 03:28:34 -0800 Subject: [PATCH] =?UTF-8?q?fix(wallet):=20=E6=B1=A0=E4=BD=99=E9=A2=9D?= =?UTF-8?q?=E4=B8=8D=E8=B6=B3=E6=97=B6=E8=B7=B3=E8=BF=87=20Kafka=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=EF=BC=8C=E9=81=BF=E5=85=8D=E6=97=A0=E9=99=90?= =?UTF-8?q?=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BurnConsumer 和 MiningDistributionConsumer 在池余额为 0 时抛出 Insufficient pool balance 错误导致 offset 不提交,造成无限重试。 现改为记录 warn 日志并标记事件为已处理(带 :skipped:insufficient_balance 标记)。 Co-Authored-By: Claude Opus 4.5 --- .../kafka/consumers/burn.consumer.ts | 16 ++++++++++++++++ .../consumers/mining-distribution.consumer.ts | 8 ++++++++ 2 files changed, 24 insertions(+) diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/burn.consumer.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/burn.consumer.ts index b42f0e30..cf73a3cd 100644 --- a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/burn.consumer.ts +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/burn.consumer.ts @@ -97,6 +97,14 @@ export class BurnConsumer implements OnModuleInit { `Minute burn processed: amount=${burnAmount.toFixed(8)}, minute=${payload.burnMinute}`, ); } catch (error) { + // 余额不足时不重试,记录警告并跳过(避免无限重试) + if (error instanceof Error && error.message.includes('Insufficient pool balance')) { + this.logger.warn( + `Insufficient SHARE_POOL_A balance for minute burn ${eventId}, skipping. ${error.message}`, + ); + await this.markEventProcessed(eventId, 'burn.minute-executed:skipped:insufficient_balance'); + return; + } this.logger.error( `Failed to process minute burn event ${eventId}`, error instanceof Error ? error.stack : error, @@ -145,6 +153,14 @@ export class BurnConsumer implements OnModuleInit { `Sell burn processed: amount=${burnAmount.toFixed(8)}, account=${payload.sourceAccountSeq}`, ); } catch (error) { + // 余额不足时不重试,记录警告并跳过(避免无限重试) + if (error instanceof Error && error.message.includes('Insufficient pool balance')) { + this.logger.warn( + `Insufficient SHARE_POOL_A balance for sell burn ${eventId}, skipping. ${error.message}`, + ); + await this.markEventProcessed(eventId, 'burn.executed:skipped:insufficient_balance'); + return; + } this.logger.error( `Failed to process sell burn event ${eventId}`, error instanceof Error ? error.stack : error, diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/mining-distribution.consumer.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/mining-distribution.consumer.ts index dd0ba92a..1c3e93ae 100644 --- a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/mining-distribution.consumer.ts +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/mining-distribution.consumer.ts @@ -63,6 +63,14 @@ export class MiningDistributionConsumer implements OnModuleInit { `Mining distribution processed: minute=${event.payload.miningMinute}, amount=${event.payload.totalDistributed}`, ); } catch (error) { + // 余额不足时不重试,记录警告并跳过(避免无限重试) + if (error instanceof Error && error.message.includes('Insufficient pool balance')) { + this.logger.warn( + `Insufficient SHARE_POOL_B balance for mining distribution ${eventId}, skipping. ${error.message}`, + ); + await this.markEventProcessed(eventId, 'mining.distribution:skipped:insufficient_balance'); + return; + } this.logger.error( `Failed to process mining distribution for minute ${event.payload.miningMinute}`, error instanceof Error ? error.stack : error,