fix(contribution-service): 修复CDC同步配置,使用正确的planting-service topic

- 修改CDC topic为cdc.planting.public.planting_orders
- 更新healthcheck使用api/v2
- 更新handler适配planting_orders表字段

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-11 09:18:37 -08:00
parent a14daae222
commit 05a8168a31
3 changed files with 37 additions and 28 deletions

View File

@ -6,9 +6,9 @@ import { ContributionCalculationService } from '../services/contribution-calcula
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
/** /**
* CDC * CDC
* * 1.0 planting-service同步过来的planting_orders数
* *
*/ */
@Injectable() @Injectable()
export class AdoptionSyncedHandler { export class AdoptionSyncedHandler {
@ -47,15 +47,21 @@ export class AdoptionSyncedHandler {
private async handleCreate(data: any, sequenceNum: bigint): Promise<void> { private async handleCreate(data: any, sequenceNum: bigint): Promise<void> {
if (!data) return; if (!data) return;
// planting_orders表字段: order_id, account_sequence, tree_count, created_at, status
const orderId = data.order_id || data.id;
const accountSequence = data.account_sequence || data.accountSequence;
const treeCount = data.tree_count || data.treeCount;
const createdAt = data.created_at || data.createdAt || data.paid_at || data.paidAt;
await this.unitOfWork.executeInTransaction(async () => { await this.unitOfWork.executeInTransaction(async () => {
// 保存同步的认种数据 // 保存同步的认种订单数据
const adoption = await this.syncedDataRepository.upsertSyncedAdoption({ const adoption = await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: BigInt(data.id), originalAdoptionId: BigInt(orderId),
accountSequence: data.account_sequence || data.accountSequence, accountSequence: accountSequence,
treeCount: data.tree_count || data.treeCount, treeCount: treeCount,
adoptionDate: new Date(data.adoption_date || data.adoptionDate || data.created_at || data.createdAt), adoptionDate: new Date(createdAt),
status: data.status ?? null, status: data.status ?? null,
contributionPerTree: new Decimal(data.contribution_per_tree || data.contributionPerTree || '1'), contributionPerTree: new Decimal('1'), // 每棵树1算力
sourceSequenceNum: sequenceNum, sourceSequenceNum: sequenceNum,
}); });
@ -64,14 +70,15 @@ export class AdoptionSyncedHandler {
}); });
this.logger.log( this.logger.log(
`Adoption synced and contribution calculated: ${data.id}, account: ${data.account_sequence || data.accountSequence}`, `Planting order synced and contribution calculated: ${orderId}, account: ${accountSequence}`,
); );
} }
private async handleUpdate(after: any, before: any, sequenceNum: bigint): Promise<void> { private async handleUpdate(after: any, before: any, sequenceNum: bigint): Promise<void> {
if (!after) return; if (!after) return;
const originalAdoptionId = BigInt(after.id); const orderId = after.order_id || after.id;
const originalAdoptionId = BigInt(orderId);
// 检查是否已经处理过 // 检查是否已经处理过
const existingAdoption = await this.syncedDataRepository.findSyncedAdoptionByOriginalId(originalAdoptionId); const existingAdoption = await this.syncedDataRepository.findSyncedAdoptionByOriginalId(originalAdoptionId);
@ -81,21 +88,25 @@ export class AdoptionSyncedHandler {
const newTreeCount = after.tree_count || after.treeCount; const newTreeCount = after.tree_count || after.treeCount;
if (existingAdoption.treeCount !== newTreeCount) { if (existingAdoption.treeCount !== newTreeCount) {
this.logger.warn( this.logger.warn(
`Adoption tree count changed after processing: ${originalAdoptionId}. This requires special handling.`, `Planting order tree count changed after processing: ${originalAdoptionId}. This requires special handling.`,
); );
// TODO: 实现树数量变化的处理逻辑 // TODO: 实现树数量变化的处理逻辑
} }
return; return;
} }
const accountSequence = after.account_sequence || after.accountSequence;
const treeCount = after.tree_count || after.treeCount;
const createdAt = after.created_at || after.createdAt || after.paid_at || after.paidAt;
await this.unitOfWork.executeInTransaction(async () => { await this.unitOfWork.executeInTransaction(async () => {
const adoption = await this.syncedDataRepository.upsertSyncedAdoption({ const adoption = await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: originalAdoptionId, originalAdoptionId: originalAdoptionId,
accountSequence: after.account_sequence || after.accountSequence, accountSequence: accountSequence,
treeCount: after.tree_count || after.treeCount, treeCount: treeCount,
adoptionDate: new Date(after.adoption_date || after.adoptionDate || after.created_at || after.createdAt), adoptionDate: new Date(createdAt),
status: after.status ?? null, status: after.status ?? null,
contributionPerTree: new Decimal(after.contribution_per_tree || after.contributionPerTree || '1'), contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum, sourceSequenceNum: sequenceNum,
}); });
@ -104,7 +115,7 @@ export class AdoptionSyncedHandler {
} }
}); });
this.logger.debug(`Adoption updated: ${originalAdoptionId}`); this.logger.debug(`Planting order updated: ${originalAdoptionId}`);
} }
private async handleDelete(data: any): Promise<void> { private async handleDelete(data: any): Promise<void> {

View File

@ -79,14 +79,12 @@ export class CDCConsumerService implements OnModuleInit {
await this.consumer.connect(); await this.consumer.connect();
this.logger.log('CDC consumer connected'); this.logger.log('CDC consumer connected');
// 订阅 Debezium CDC topics // 订阅 Debezium CDC topics (从1.0 planting-service同步)
const topics = [ const topics = [
// 用户表 // 认种订单表 (planting_orders)
this.configService.get<string>('CDC_TOPIC_USERS', 'dbserver1.public.users'), this.configService.get<string>('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'),
// 认种表 // 资金分配表 (fund_allocations)
this.configService.get<string>('CDC_TOPIC_ADOPTIONS', 'dbserver1.public.adoptions'), this.configService.get<string>('CDC_TOPIC_PAYMENTS', 'cdc.planting.public.fund_allocations'),
// 引荐表
this.configService.get<string>('CDC_TOPIC_REFERRALS', 'dbserver1.public.referrals'),
]; ];
await this.consumer.subscribe({ await this.consumer.subscribe({

View File

@ -37,15 +37,15 @@ services:
REDIS_PORT: 6379 REDIS_PORT: 6379
REDIS_PASSWORD: ${REDIS_PASSWORD:-} REDIS_PASSWORD: ${REDIS_PASSWORD:-}
REDIS_DB: 10 REDIS_DB: 10
# Kafka - 消费 CDC 事件 # Kafka - 消费 CDC 事件 (从1.0 planting-service同步认种数据)
KAFKA_BROKERS: kafka:29092 KAFKA_BROKERS: kafka:29092
CDC_TOPIC_ADOPTIONS: ${CDC_TOPIC_ADOPTIONS:-dbserver1.public.adoptions} CDC_TOPIC_ADOPTIONS: ${CDC_TOPIC_ADOPTIONS:-cdc.planting.public.planting_orders}
CDC_TOPIC_PAYMENTS: ${CDC_TOPIC_PAYMENTS:-dbserver1.public.payment_records} CDC_TOPIC_PAYMENTS: ${CDC_TOPIC_PAYMENTS:-cdc.planting.public.fund_allocations}
CDC_CONSUMER_GROUP: contribution-service-cdc-group CDC_CONSUMER_GROUP: contribution-service-cdc-group
ports: ports:
- "3020:3020" - "3020:3020"
healthcheck: healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3020/api/v1/health"] test: ["CMD", "curl", "-f", "http://localhost:3020/api/v2/health"]
interval: 30s interval: 30s
timeout: 10s timeout: 10s
retries: 3 retries: 3