feat(admin-service): 添加 BenefitAssessmentRecord CDC 同步

- 新增 BenefitAssessmentQueryView schema 和 migration
- 扩展 AuthorizationCdcConsumerService 处理 benefit_assessment_records 表
- 更新 Debezium authorization-connector 添加新表同步

CDC 同步字段:
- authorization_id, user_id, account_sequence
- role_type, region_code, region_name
- assessment_month, month_index
- monthly_target, cumulative_target
- trees_completed, trees_required
- benefit_action_taken, previous/new_benefit_status
- new_valid_until, result, remarks, assessed_at

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-08 04:22:37 -08:00
parent 8f5b4df3d1
commit 178c484f04
4 changed files with 235 additions and 2 deletions

View File

@ -0,0 +1,42 @@
-- CreateTable
CREATE TABLE "benefit_assessment_query_view" (
"id" TEXT NOT NULL,
"authorization_id" TEXT NOT NULL,
"user_id" TEXT NOT NULL,
"account_sequence" TEXT NOT NULL,
"role_type" VARCHAR(30) NOT NULL,
"region_code" TEXT NOT NULL,
"region_name" TEXT NOT NULL,
"assessment_month" TEXT NOT NULL,
"month_index" INTEGER NOT NULL,
"monthly_target" INTEGER NOT NULL,
"cumulative_target" INTEGER NOT NULL,
"trees_completed" INTEGER NOT NULL,
"trees_required" INTEGER NOT NULL,
"benefit_action_taken" VARCHAR(20) NOT NULL,
"previous_benefit_status" BOOLEAN NOT NULL,
"new_benefit_status" BOOLEAN NOT NULL,
"new_valid_until" TIMESTAMP(3),
"result" VARCHAR(20) NOT NULL DEFAULT 'NOT_ASSESSED',
"remarks" VARCHAR(500),
"assessed_at" TIMESTAMP(3) NOT NULL,
"created_at" TIMESTAMP(3) NOT NULL,
"synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "benefit_assessment_query_view_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "benefit_assessment_query_view_account_sequence_assessment_m_idx" ON "benefit_assessment_query_view"("account_sequence", "assessment_month");
-- CreateIndex
CREATE INDEX "benefit_assessment_query_view_user_id_assessment_month_idx" ON "benefit_assessment_query_view"("user_id", "assessment_month");
-- CreateIndex
CREATE INDEX "benefit_assessment_query_view_role_type_region_code_assessm_idx" ON "benefit_assessment_query_view"("role_type", "region_code", "assessment_month");
-- CreateIndex
CREATE INDEX "benefit_assessment_query_view_assessment_month_result_idx" ON "benefit_assessment_query_view"("assessment_month", "result");
-- CreateIndex
CREATE UNIQUE INDEX "benefit_assessment_query_view_authorization_id_assessment_m_key" ON "benefit_assessment_query_view"("authorization_id", "assessment_month");

View File

@ -851,6 +851,54 @@ model MonthlyAssessmentQueryView {
@@map("monthly_assessment_query_view")
}
/// 权益考核记录查询视图 - 通过 Debezium CDC 从 authorization-service 同步
/// 独立于火柴人排名MonthlyAssessment专门记录权益有效性考核历史
model BenefitAssessmentQueryView {
id String @id
authorizationId String @map("authorization_id")
userId String @map("user_id")
accountSequence String @map("account_sequence")
roleType String @map("role_type") @db.VarChar(30)
regionCode String @map("region_code")
regionName String @map("region_name")
// 考核月份
assessmentMonth String @map("assessment_month")
monthIndex Int @map("month_index")
// 考核目标
monthlyTarget Int @map("monthly_target")
cumulativeTarget Int @map("cumulative_target")
// 完成情况
treesCompleted Int @map("trees_completed")
treesRequired Int @map("trees_required")
// 权益状态变化
benefitActionTaken String @map("benefit_action_taken") @db.VarChar(20)
previousBenefitStatus Boolean @map("previous_benefit_status")
newBenefitStatus Boolean @map("new_benefit_status")
newValidUntil DateTime? @map("new_valid_until")
// 考核结果
result String @default("NOT_ASSESSED") @db.VarChar(20)
// 备注
remarks String? @map("remarks") @db.VarChar(500)
// 时间戳
assessedAt DateTime @map("assessed_at")
createdAt DateTime @map("created_at")
syncedAt DateTime @default(now()) @map("synced_at")
@@unique([authorizationId, assessmentMonth])
@@index([accountSequence, assessmentMonth])
@@index([userId, assessmentMonth])
@@index([roleType, regionCode, assessmentMonth])
@@index([assessmentMonth, result])
@@map("benefit_assessment_query_view")
}
/// 系统账户查询视图 - 通过 Debezium CDC 从 authorization-service 同步
model SystemAccountQueryView {
id BigInt @id @map("account_id")

View File

@ -100,7 +100,36 @@ interface CdcSystemAccountPayload {
__deleted?: string;
}
type CdcAuthorizationPayload = CdcAuthorizationRolePayload | CdcMonthlyAssessmentPayload | CdcSystemAccountLedgerPayload | CdcSystemAccountPayload;
// benefit_assessment_records 表 (权益考核记录)
interface CdcBenefitAssessmentRecordPayload {
id: string;
authorization_id: string;
user_id: string;
account_sequence: string;
role_type: string;
region_code: string;
region_name: string;
assessment_month: string;
month_index: number;
monthly_target: number;
cumulative_target: number;
trees_completed: number;
trees_required: number;
benefit_action_taken: string;
previous_benefit_status: boolean;
new_benefit_status: boolean;
new_valid_until?: string | null;
result: string;
remarks?: string | null;
assessed_at: string;
created_at: string;
__op: 'c' | 'u' | 'd' | 'r';
__table: string;
__source_ts_ms: number;
__deleted?: string;
}
type CdcAuthorizationPayload = CdcAuthorizationRolePayload | CdcMonthlyAssessmentPayload | CdcSystemAccountLedgerPayload | CdcSystemAccountPayload | CdcBenefitAssessmentRecordPayload;
/**
* Authorization CDC
@ -125,6 +154,7 @@ export class AuthorizationCdcConsumerService implements OnModuleInit, OnModuleDe
'cdc.authorization.public.monthly_assessments',
'cdc.authorization.public.system_accounts',
'cdc.authorization.public.system_account_ledgers',
'cdc.authorization.public.benefit_assessment_records',
];
private readonly consumerGroup: string;
@ -237,6 +267,9 @@ export class AuthorizationCdcConsumerService implements OnModuleInit, OnModuleDe
case 'system_account_ledgers':
await this.processSystemAccountLedgerEvent(data as CdcSystemAccountLedgerPayload);
break;
case 'benefit_assessment_records':
await this.processBenefitAssessmentRecordEvent(data as CdcBenefitAssessmentRecordPayload);
break;
default:
this.logger.warn(`[Authorization-CDC] Unknown table: ${table}`);
}
@ -623,6 +656,116 @@ export class AuthorizationCdcConsumerService implements OnModuleInit, OnModuleDe
}
}
// ==================== benefit_assessment_records 处理 ====================
private async processBenefitAssessmentRecordEvent(data: CdcBenefitAssessmentRecordPayload): Promise<void> {
const operation = data.__op;
const isDeleted = data.__deleted === 'true';
if (operation === 'd' || isDeleted) {
await this.handleBenefitAssessmentRecordDelete(data);
} else if (operation === 'c' || operation === 'r') {
await this.handleBenefitAssessmentRecordCreateOrSnapshot(data);
} else if (operation === 'u') {
await this.handleBenefitAssessmentRecordUpdate(data);
}
}
private async handleBenefitAssessmentRecordCreateOrSnapshot(data: CdcBenefitAssessmentRecordPayload): Promise<void> {
await this.prisma.benefitAssessmentQueryView.upsert({
where: { id: data.id },
create: {
id: data.id,
authorizationId: data.authorization_id,
userId: data.user_id,
accountSequence: data.account_sequence,
roleType: data.role_type,
regionCode: data.region_code,
regionName: data.region_name,
assessmentMonth: data.assessment_month,
monthIndex: data.month_index,
monthlyTarget: data.monthly_target,
cumulativeTarget: data.cumulative_target,
treesCompleted: data.trees_completed,
treesRequired: data.trees_required,
benefitActionTaken: data.benefit_action_taken,
previousBenefitStatus: data.previous_benefit_status,
newBenefitStatus: data.new_benefit_status,
newValidUntil: data.new_valid_until ? new Date(data.new_valid_until) : null,
result: data.result,
remarks: data.remarks || null,
assessedAt: new Date(data.assessed_at),
createdAt: new Date(data.created_at),
syncedAt: new Date(),
},
update: {
authorizationId: data.authorization_id,
userId: data.user_id,
accountSequence: data.account_sequence,
roleType: data.role_type,
regionCode: data.region_code,
regionName: data.region_name,
assessmentMonth: data.assessment_month,
monthIndex: data.month_index,
monthlyTarget: data.monthly_target,
cumulativeTarget: data.cumulative_target,
treesCompleted: data.trees_completed,
treesRequired: data.trees_required,
benefitActionTaken: data.benefit_action_taken,
previousBenefitStatus: data.previous_benefit_status,
newBenefitStatus: data.new_benefit_status,
newValidUntil: data.new_valid_until ? new Date(data.new_valid_until) : null,
result: data.result,
remarks: data.remarks || null,
assessedAt: new Date(data.assessed_at),
syncedAt: new Date(),
},
});
this.logger.log(`[Authorization-CDC] Created/Snapshot benefit assessment record: ${data.id} (${data.role_type} - ${data.assessment_month})`);
}
private async handleBenefitAssessmentRecordUpdate(data: CdcBenefitAssessmentRecordPayload): Promise<void> {
const exists = await this.prisma.benefitAssessmentQueryView.findUnique({
where: { id: data.id },
select: { id: true },
});
if (!exists) {
await this.handleBenefitAssessmentRecordCreateOrSnapshot(data);
return;
}
await this.prisma.benefitAssessmentQueryView.update({
where: { id: data.id },
data: {
treesCompleted: data.trees_completed,
treesRequired: data.trees_required,
benefitActionTaken: data.benefit_action_taken,
previousBenefitStatus: data.previous_benefit_status,
newBenefitStatus: data.new_benefit_status,
newValidUntil: data.new_valid_until ? new Date(data.new_valid_until) : null,
result: data.result,
remarks: data.remarks || null,
assessedAt: new Date(data.assessed_at),
syncedAt: new Date(),
},
});
this.logger.log(`[Authorization-CDC] Updated benefit assessment record: ${data.id}`);
}
private async handleBenefitAssessmentRecordDelete(data: CdcBenefitAssessmentRecordPayload): Promise<void> {
try {
await this.prisma.benefitAssessmentQueryView.delete({
where: { id: data.id },
});
this.logger.log(`[Authorization-CDC] Deleted benefit assessment record: ${data.id}`);
} catch {
this.logger.warn(`[Authorization-CDC] Benefit assessment record not found for delete: ${data.id}`);
}
}
// ==================== Helper Methods ====================
private async isEventProcessed(eventId: string): Promise<boolean> {

View File

@ -12,7 +12,7 @@
"plugin.name": "pgoutput",
"publication.name": "authorization_cdc_publication",
"slot.name": "authorization_cdc_slot",
"table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers",
"table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers,public.benefit_assessment_records",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",