fix(contribution-service): use real contributionPerTree from rate service
Previously, adoptions were synced with hardcoded contributionPerTree=1, resulting in contribution values like 0.7 instead of the expected 15831.9. Now the handler fetches the actual contribution rate from ContributionRateService based on the adoption date, storing values like: - Personal (70%): 22617 × 70% = 15831.9 - Team Level (0.5%): 22617 × 0.5% = 113.085 - Team Bonus (2.5%): 22617 × 2.5% = 565.425 Note: Historical data may need migration to apply the correct multiplier. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
f57b0f9c26
commit
141db46356
|
|
@ -2,6 +2,7 @@ import { Injectable, Logger } from '@nestjs/common';
|
||||||
import Decimal from 'decimal.js';
|
import Decimal from 'decimal.js';
|
||||||
import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
|
import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
|
||||||
import { ContributionCalculationService } from '../services/contribution-calculation.service';
|
import { ContributionCalculationService } from '../services/contribution-calculation.service';
|
||||||
|
import { ContributionRateService } from '../services/contribution-rate.service';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 认种同步结果,用于事务提交后的算力计算
|
* 认种同步结果,用于事务提交后的算力计算
|
||||||
|
|
@ -27,6 +28,7 @@ export class AdoptionSyncedHandler {
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly contributionCalculationService: ContributionCalculationService,
|
private readonly contributionCalculationService: ContributionCalculationService,
|
||||||
|
private readonly contributionRateService: ContributionRateService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -40,13 +42,28 @@ export class AdoptionSyncedHandler {
|
||||||
this.logger.log(`[CDC] Adoption event received: op=${op}, seq=${event.sequenceNum}`);
|
this.logger.log(`[CDC] Adoption event received: op=${op}, seq=${event.sequenceNum}`);
|
||||||
this.logger.debug(`[CDC] Adoption event payload: ${JSON.stringify(after || before)}`);
|
this.logger.debug(`[CDC] Adoption event payload: ${JSON.stringify(after || before)}`);
|
||||||
|
|
||||||
|
// 获取认种日期,用于查询当日贡献值
|
||||||
|
const data = after || before;
|
||||||
|
const adoptionDate = data?.created_at || data?.createdAt || data?.paid_at || data?.paidAt;
|
||||||
|
|
||||||
|
// 在事务外获取当日每棵树的贡献值
|
||||||
|
let contributionPerTree = new Decimal('22617'); // 默认值
|
||||||
|
if (adoptionDate) {
|
||||||
|
try {
|
||||||
|
contributionPerTree = await this.contributionRateService.getContributionPerTree(new Date(adoptionDate));
|
||||||
|
this.logger.log(`[CDC] Got contributionPerTree for ${adoptionDate}: ${contributionPerTree.toString()}`);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(`[CDC] Failed to get contributionPerTree, using default 22617`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
switch (op) {
|
switch (op) {
|
||||||
case 'c': // create
|
case 'c': // create
|
||||||
case 'r': // read (snapshot)
|
case 'r': // read (snapshot)
|
||||||
return await this.handleCreate(after, event.sequenceNum, tx);
|
return await this.handleCreate(after, event.sequenceNum, tx, contributionPerTree);
|
||||||
case 'u': // update
|
case 'u': // update
|
||||||
return await this.handleUpdate(after, before, event.sequenceNum, tx);
|
return await this.handleUpdate(after, before, event.sequenceNum, tx, contributionPerTree);
|
||||||
case 'd': // delete
|
case 'd': // delete
|
||||||
await this.handleDelete(before);
|
await this.handleDelete(before);
|
||||||
return null;
|
return null;
|
||||||
|
|
@ -78,7 +95,7 @@ export class AdoptionSyncedHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<AdoptionSyncResult | null> {
|
private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient, contributionPerTree: Decimal): Promise<AdoptionSyncResult | null> {
|
||||||
if (!data) {
|
if (!data) {
|
||||||
this.logger.warn(`[CDC] Adoption create: empty data received`);
|
this.logger.warn(`[CDC] Adoption create: empty data received`);
|
||||||
return null;
|
return null;
|
||||||
|
|
@ -92,7 +109,7 @@ export class AdoptionSyncedHandler {
|
||||||
const selectedCity = data.selected_city || data.selectedCity || null;
|
const selectedCity = data.selected_city || data.selectedCity || null;
|
||||||
const status = data.status ?? null;
|
const status = data.status ?? null;
|
||||||
|
|
||||||
this.logger.log(`[CDC] Adoption create: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}, status=${status}`);
|
this.logger.log(`[CDC] Adoption create: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}, status=${status}, contributionPerTree=${contributionPerTree.toString()}`);
|
||||||
|
|
||||||
if (!orderId || !accountSequence) {
|
if (!orderId || !accountSequence) {
|
||||||
this.logger.warn(`[CDC] Invalid adoption data: missing order_id or account_sequence`, { data });
|
this.logger.warn(`[CDC] Invalid adoption data: missing order_id or account_sequence`, { data });
|
||||||
|
|
@ -101,7 +118,7 @@ export class AdoptionSyncedHandler {
|
||||||
|
|
||||||
const originalAdoptionId = BigInt(orderId);
|
const originalAdoptionId = BigInt(orderId);
|
||||||
|
|
||||||
// 100%同步数据
|
// 100%同步数据,使用真实的每棵树贡献值
|
||||||
await tx.syncedAdoption.upsert({
|
await tx.syncedAdoption.upsert({
|
||||||
where: { originalAdoptionId },
|
where: { originalAdoptionId },
|
||||||
create: {
|
create: {
|
||||||
|
|
@ -112,7 +129,7 @@ export class AdoptionSyncedHandler {
|
||||||
status,
|
status,
|
||||||
selectedProvince,
|
selectedProvince,
|
||||||
selectedCity,
|
selectedCity,
|
||||||
contributionPerTree: new Decimal('1'),
|
contributionPerTree,
|
||||||
sourceSequenceNum: sequenceNum,
|
sourceSequenceNum: sequenceNum,
|
||||||
syncedAt: new Date(),
|
syncedAt: new Date(),
|
||||||
},
|
},
|
||||||
|
|
@ -123,7 +140,7 @@ export class AdoptionSyncedHandler {
|
||||||
status,
|
status,
|
||||||
selectedProvince,
|
selectedProvince,
|
||||||
selectedCity,
|
selectedCity,
|
||||||
contributionPerTree: new Decimal('1'),
|
contributionPerTree,
|
||||||
sourceSequenceNum: sequenceNum,
|
sourceSequenceNum: sequenceNum,
|
||||||
syncedAt: new Date(),
|
syncedAt: new Date(),
|
||||||
},
|
},
|
||||||
|
|
@ -139,7 +156,7 @@ export class AdoptionSyncedHandler {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private async handleUpdate(after: any, before: any, sequenceNum: bigint, tx: TransactionClient): Promise<AdoptionSyncResult | null> {
|
private async handleUpdate(after: any, before: any, sequenceNum: bigint, tx: TransactionClient, contributionPerTree: Decimal): Promise<AdoptionSyncResult | null> {
|
||||||
if (!after) {
|
if (!after) {
|
||||||
this.logger.warn(`[CDC] Adoption update: empty after data received`);
|
this.logger.warn(`[CDC] Adoption update: empty after data received`);
|
||||||
return null;
|
return null;
|
||||||
|
|
@ -155,14 +172,14 @@ export class AdoptionSyncedHandler {
|
||||||
const newStatus = after.status ?? null;
|
const newStatus = after.status ?? null;
|
||||||
const oldStatus = before?.status ?? null;
|
const oldStatus = before?.status ?? null;
|
||||||
|
|
||||||
this.logger.log(`[CDC] Adoption update: orderId=${orderId}, status=${oldStatus} -> ${newStatus}`);
|
this.logger.log(`[CDC] Adoption update: orderId=${orderId}, status=${oldStatus} -> ${newStatus}, contributionPerTree=${contributionPerTree.toString()}`);
|
||||||
|
|
||||||
// 查询现有记录
|
// 查询现有记录
|
||||||
const existingAdoption = await tx.syncedAdoption.findUnique({
|
const existingAdoption = await tx.syncedAdoption.findUnique({
|
||||||
where: { originalAdoptionId },
|
where: { originalAdoptionId },
|
||||||
});
|
});
|
||||||
|
|
||||||
// 100%同步数据,不跳过任何更新
|
// 100%同步数据,使用真实的每棵树贡献值
|
||||||
await tx.syncedAdoption.upsert({
|
await tx.syncedAdoption.upsert({
|
||||||
where: { originalAdoptionId },
|
where: { originalAdoptionId },
|
||||||
create: {
|
create: {
|
||||||
|
|
@ -173,7 +190,7 @@ export class AdoptionSyncedHandler {
|
||||||
status: newStatus,
|
status: newStatus,
|
||||||
selectedProvince,
|
selectedProvince,
|
||||||
selectedCity,
|
selectedCity,
|
||||||
contributionPerTree: new Decimal('1'),
|
contributionPerTree,
|
||||||
sourceSequenceNum: sequenceNum,
|
sourceSequenceNum: sequenceNum,
|
||||||
syncedAt: new Date(),
|
syncedAt: new Date(),
|
||||||
},
|
},
|
||||||
|
|
@ -184,7 +201,7 @@ export class AdoptionSyncedHandler {
|
||||||
status: newStatus,
|
status: newStatus,
|
||||||
selectedProvince,
|
selectedProvince,
|
||||||
selectedCity,
|
selectedCity,
|
||||||
contributionPerTree: new Decimal('1'),
|
contributionPerTree,
|
||||||
sourceSequenceNum: sequenceNum,
|
sourceSequenceNum: sequenceNum,
|
||||||
syncedAt: new Date(),
|
syncedAt: new Date(),
|
||||||
},
|
},
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue