From 178c484f04b216d424357322059a14e9c8b9ed56 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 8 Jan 2026 04:22:37 -0800 Subject: [PATCH] =?UTF-8?q?feat(admin-service):=20=E6=B7=BB=E5=8A=A0=20Ben?= =?UTF-8?q?efitAssessmentRecord=20CDC=20=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 BenefitAssessmentQueryView schema 和 migration - 扩展 AuthorizationCdcConsumerService 处理 benefit_assessment_records 表 - 更新 Debezium authorization-connector 添加新表同步 CDC 同步字段: - authorization_id, user_id, account_sequence - role_type, region_code, region_name - assessment_month, month_index - monthly_target, cumulative_target - trees_completed, trees_required - benefit_action_taken, previous/new_benefit_status - new_valid_until, result, remarks, assessed_at 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../migration.sql | 42 +++++ .../admin-service/prisma/schema.prisma | 48 ++++++ .../authorization-cdc-consumer.service.ts | 145 +++++++++++++++++- .../debezium/authorization-connector.json | 2 +- 4 files changed, 235 insertions(+), 2 deletions(-) create mode 100644 backend/services/admin-service/prisma/migrations/20250108100000_add_benefit_assessment_query_view/migration.sql diff --git a/backend/services/admin-service/prisma/migrations/20250108100000_add_benefit_assessment_query_view/migration.sql b/backend/services/admin-service/prisma/migrations/20250108100000_add_benefit_assessment_query_view/migration.sql new file mode 100644 index 00000000..0d8c88df --- /dev/null +++ b/backend/services/admin-service/prisma/migrations/20250108100000_add_benefit_assessment_query_view/migration.sql @@ -0,0 +1,42 @@ +-- CreateTable +CREATE TABLE "benefit_assessment_query_view" ( + "id" TEXT NOT NULL, + "authorization_id" TEXT NOT NULL, + "user_id" TEXT NOT NULL, + "account_sequence" TEXT NOT NULL, + "role_type" VARCHAR(30) NOT NULL, + "region_code" TEXT NOT NULL, + "region_name" TEXT NOT NULL, + "assessment_month" TEXT NOT NULL, + "month_index" INTEGER NOT NULL, + "monthly_target" INTEGER NOT NULL, + "cumulative_target" INTEGER NOT NULL, + "trees_completed" INTEGER NOT NULL, + "trees_required" INTEGER NOT NULL, + "benefit_action_taken" VARCHAR(20) NOT NULL, + "previous_benefit_status" BOOLEAN NOT NULL, + "new_benefit_status" BOOLEAN NOT NULL, + "new_valid_until" TIMESTAMP(3), + "result" VARCHAR(20) NOT NULL DEFAULT 'NOT_ASSESSED', + "remarks" VARCHAR(500), + "assessed_at" TIMESTAMP(3) NOT NULL, + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "benefit_assessment_query_view_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "benefit_assessment_query_view_account_sequence_assessment_m_idx" ON "benefit_assessment_query_view"("account_sequence", "assessment_month"); + +-- CreateIndex +CREATE INDEX "benefit_assessment_query_view_user_id_assessment_month_idx" ON "benefit_assessment_query_view"("user_id", "assessment_month"); + +-- CreateIndex +CREATE INDEX "benefit_assessment_query_view_role_type_region_code_assessm_idx" ON "benefit_assessment_query_view"("role_type", "region_code", "assessment_month"); + +-- CreateIndex +CREATE INDEX "benefit_assessment_query_view_assessment_month_result_idx" ON "benefit_assessment_query_view"("assessment_month", "result"); + +-- CreateIndex +CREATE UNIQUE INDEX "benefit_assessment_query_view_authorization_id_assessment_m_key" ON "benefit_assessment_query_view"("authorization_id", "assessment_month"); diff --git a/backend/services/admin-service/prisma/schema.prisma b/backend/services/admin-service/prisma/schema.prisma index 927e6295..949c6ec5 100644 --- a/backend/services/admin-service/prisma/schema.prisma +++ b/backend/services/admin-service/prisma/schema.prisma @@ -851,6 +851,54 @@ model MonthlyAssessmentQueryView { @@map("monthly_assessment_query_view") } +/// 权益考核记录查询视图 - 通过 Debezium CDC 从 authorization-service 同步 +/// 独立于火柴人排名(MonthlyAssessment),专门记录权益有效性考核历史 +model BenefitAssessmentQueryView { + id String @id + authorizationId String @map("authorization_id") + userId String @map("user_id") + accountSequence String @map("account_sequence") + roleType String @map("role_type") @db.VarChar(30) + regionCode String @map("region_code") + regionName String @map("region_name") + + // 考核月份 + assessmentMonth String @map("assessment_month") + monthIndex Int @map("month_index") + + // 考核目标 + monthlyTarget Int @map("monthly_target") + cumulativeTarget Int @map("cumulative_target") + + // 完成情况 + treesCompleted Int @map("trees_completed") + treesRequired Int @map("trees_required") + + // 权益状态变化 + benefitActionTaken String @map("benefit_action_taken") @db.VarChar(20) + previousBenefitStatus Boolean @map("previous_benefit_status") + newBenefitStatus Boolean @map("new_benefit_status") + newValidUntil DateTime? @map("new_valid_until") + + // 考核结果 + result String @default("NOT_ASSESSED") @db.VarChar(20) + + // 备注 + remarks String? @map("remarks") @db.VarChar(500) + + // 时间戳 + assessedAt DateTime @map("assessed_at") + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@unique([authorizationId, assessmentMonth]) + @@index([accountSequence, assessmentMonth]) + @@index([userId, assessmentMonth]) + @@index([roleType, regionCode, assessmentMonth]) + @@index([assessmentMonth, result]) + @@map("benefit_assessment_query_view") +} + /// 系统账户查询视图 - 通过 Debezium CDC 从 authorization-service 同步 model SystemAccountQueryView { id BigInt @id @map("account_id") diff --git a/backend/services/admin-service/src/infrastructure/kafka/authorization-cdc-consumer.service.ts b/backend/services/admin-service/src/infrastructure/kafka/authorization-cdc-consumer.service.ts index e4486b19..73fd5a87 100644 --- a/backend/services/admin-service/src/infrastructure/kafka/authorization-cdc-consumer.service.ts +++ b/backend/services/admin-service/src/infrastructure/kafka/authorization-cdc-consumer.service.ts @@ -100,7 +100,36 @@ interface CdcSystemAccountPayload { __deleted?: string; } -type CdcAuthorizationPayload = CdcAuthorizationRolePayload | CdcMonthlyAssessmentPayload | CdcSystemAccountLedgerPayload | CdcSystemAccountPayload; +// benefit_assessment_records 表 (权益考核记录) +interface CdcBenefitAssessmentRecordPayload { + id: string; + authorization_id: string; + user_id: string; + account_sequence: string; + role_type: string; + region_code: string; + region_name: string; + assessment_month: string; + month_index: number; + monthly_target: number; + cumulative_target: number; + trees_completed: number; + trees_required: number; + benefit_action_taken: string; + previous_benefit_status: boolean; + new_benefit_status: boolean; + new_valid_until?: string | null; + result: string; + remarks?: string | null; + assessed_at: string; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +type CdcAuthorizationPayload = CdcAuthorizationRolePayload | CdcMonthlyAssessmentPayload | CdcSystemAccountLedgerPayload | CdcSystemAccountPayload | CdcBenefitAssessmentRecordPayload; /** * Authorization CDC 消费者服务 @@ -125,6 +154,7 @@ export class AuthorizationCdcConsumerService implements OnModuleInit, OnModuleDe 'cdc.authorization.public.monthly_assessments', 'cdc.authorization.public.system_accounts', 'cdc.authorization.public.system_account_ledgers', + 'cdc.authorization.public.benefit_assessment_records', ]; private readonly consumerGroup: string; @@ -237,6 +267,9 @@ export class AuthorizationCdcConsumerService implements OnModuleInit, OnModuleDe case 'system_account_ledgers': await this.processSystemAccountLedgerEvent(data as CdcSystemAccountLedgerPayload); break; + case 'benefit_assessment_records': + await this.processBenefitAssessmentRecordEvent(data as CdcBenefitAssessmentRecordPayload); + break; default: this.logger.warn(`[Authorization-CDC] Unknown table: ${table}`); } @@ -623,6 +656,116 @@ export class AuthorizationCdcConsumerService implements OnModuleInit, OnModuleDe } } + // ==================== benefit_assessment_records 处理 ==================== + + private async processBenefitAssessmentRecordEvent(data: CdcBenefitAssessmentRecordPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handleBenefitAssessmentRecordDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleBenefitAssessmentRecordCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handleBenefitAssessmentRecordUpdate(data); + } + } + + private async handleBenefitAssessmentRecordCreateOrSnapshot(data: CdcBenefitAssessmentRecordPayload): Promise { + await this.prisma.benefitAssessmentQueryView.upsert({ + where: { id: data.id }, + create: { + id: data.id, + authorizationId: data.authorization_id, + userId: data.user_id, + accountSequence: data.account_sequence, + roleType: data.role_type, + regionCode: data.region_code, + regionName: data.region_name, + assessmentMonth: data.assessment_month, + monthIndex: data.month_index, + monthlyTarget: data.monthly_target, + cumulativeTarget: data.cumulative_target, + treesCompleted: data.trees_completed, + treesRequired: data.trees_required, + benefitActionTaken: data.benefit_action_taken, + previousBenefitStatus: data.previous_benefit_status, + newBenefitStatus: data.new_benefit_status, + newValidUntil: data.new_valid_until ? new Date(data.new_valid_until) : null, + result: data.result, + remarks: data.remarks || null, + assessedAt: new Date(data.assessed_at), + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + authorizationId: data.authorization_id, + userId: data.user_id, + accountSequence: data.account_sequence, + roleType: data.role_type, + regionCode: data.region_code, + regionName: data.region_name, + assessmentMonth: data.assessment_month, + monthIndex: data.month_index, + monthlyTarget: data.monthly_target, + cumulativeTarget: data.cumulative_target, + treesCompleted: data.trees_completed, + treesRequired: data.trees_required, + benefitActionTaken: data.benefit_action_taken, + previousBenefitStatus: data.previous_benefit_status, + newBenefitStatus: data.new_benefit_status, + newValidUntil: data.new_valid_until ? new Date(data.new_valid_until) : null, + result: data.result, + remarks: data.remarks || null, + assessedAt: new Date(data.assessed_at), + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Authorization-CDC] Created/Snapshot benefit assessment record: ${data.id} (${data.role_type} - ${data.assessment_month})`); + } + + private async handleBenefitAssessmentRecordUpdate(data: CdcBenefitAssessmentRecordPayload): Promise { + const exists = await this.prisma.benefitAssessmentQueryView.findUnique({ + where: { id: data.id }, + select: { id: true }, + }); + + if (!exists) { + await this.handleBenefitAssessmentRecordCreateOrSnapshot(data); + return; + } + + await this.prisma.benefitAssessmentQueryView.update({ + where: { id: data.id }, + data: { + treesCompleted: data.trees_completed, + treesRequired: data.trees_required, + benefitActionTaken: data.benefit_action_taken, + previousBenefitStatus: data.previous_benefit_status, + newBenefitStatus: data.new_benefit_status, + newValidUntil: data.new_valid_until ? new Date(data.new_valid_until) : null, + result: data.result, + remarks: data.remarks || null, + assessedAt: new Date(data.assessed_at), + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Authorization-CDC] Updated benefit assessment record: ${data.id}`); + } + + private async handleBenefitAssessmentRecordDelete(data: CdcBenefitAssessmentRecordPayload): Promise { + try { + await this.prisma.benefitAssessmentQueryView.delete({ + where: { id: data.id }, + }); + this.logger.log(`[Authorization-CDC] Deleted benefit assessment record: ${data.id}`); + } catch { + this.logger.warn(`[Authorization-CDC] Benefit assessment record not found for delete: ${data.id}`); + } + } + // ==================== Helper Methods ==================== private async isEventProcessed(eventId: string): Promise { diff --git a/backend/services/scripts/debezium/authorization-connector.json b/backend/services/scripts/debezium/authorization-connector.json index f7a86d02..781498d9 100644 --- a/backend/services/scripts/debezium/authorization-connector.json +++ b/backend/services/scripts/debezium/authorization-connector.json @@ -12,7 +12,7 @@ "plugin.name": "pgoutput", "publication.name": "authorization_cdc_publication", "slot.name": "authorization_cdc_slot", - "table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers", + "table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers,public.benefit_assessment_records", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.add.fields": "op,table,source.ts_ms",