diff --git a/backend/services/admin-service/prisma/migrations/20250107120000_add_cdc_query_views/migration.sql b/backend/services/admin-service/prisma/migrations/20250107120000_add_cdc_query_views/migration.sql new file mode 100644 index 00000000..f20c0ef8 --- /dev/null +++ b/backend/services/admin-service/prisma/migrations/20250107120000_add_cdc_query_views/migration.sql @@ -0,0 +1,354 @@ +-- ============================================================================= +-- CDC Query Views Migration +-- 创建用于 Debezium CDC 数据同步的查询视图表 +-- ============================================================================= + +-- ----------------------------------------------------------------------------- +-- Wallet Query Views (钱包查询视图) +-- ----------------------------------------------------------------------------- + +-- 钱包账户查询视图 +CREATE TABLE "wallet_account_query_view" ( + "wallet_id" BIGINT NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "user_id" BIGINT NOT NULL, + "usdt_available" DECIMAL(20,8) NOT NULL DEFAULT 0, + "usdt_frozen" DECIMAL(20,8) NOT NULL DEFAULT 0, + "dst_available" DECIMAL(20,8) NOT NULL DEFAULT 0, + "dst_frozen" DECIMAL(20,8) NOT NULL DEFAULT 0, + "bnb_available" DECIMAL(20,8) NOT NULL DEFAULT 0, + "bnb_frozen" DECIMAL(20,8) NOT NULL DEFAULT 0, + "og_available" DECIMAL(20,8) NOT NULL DEFAULT 0, + "og_frozen" DECIMAL(20,8) NOT NULL DEFAULT 0, + "rwad_available" DECIMAL(20,8) NOT NULL DEFAULT 0, + "rwad_frozen" DECIMAL(20,8) NOT NULL DEFAULT 0, + "hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "pending_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "pending_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settleable_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settleable_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settled_total_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "settled_total_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "expired_total_usdt" DECIMAL(20,8) NOT NULL DEFAULT 0, + "expired_total_hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "status" VARCHAR(20) NOT NULL DEFAULT 'ACTIVE', + "has_planted" BOOLEAN NOT NULL DEFAULT false, + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "wallet_account_query_view_pkey" PRIMARY KEY ("wallet_id") +); + +CREATE UNIQUE INDEX "wallet_account_query_view_account_sequence_key" ON "wallet_account_query_view"("account_sequence"); +CREATE UNIQUE INDEX "wallet_account_query_view_user_id_key" ON "wallet_account_query_view"("user_id"); +CREATE INDEX "wallet_account_query_view_account_sequence_idx" ON "wallet_account_query_view"("account_sequence"); +CREATE INDEX "wallet_account_query_view_usdt_available_idx" ON "wallet_account_query_view"("usdt_available" DESC); +CREATE INDEX "wallet_account_query_view_hashpower_idx" ON "wallet_account_query_view"("hashpower" DESC); +CREATE INDEX "wallet_account_query_view_status_idx" ON "wallet_account_query_view"("status"); + +-- 提现订单查询视图 +CREATE TABLE "withdrawal_order_query_view" ( + "order_id" BIGINT NOT NULL, + "order_no" VARCHAR(50) NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "user_id" BIGINT NOT NULL, + "amount" DECIMAL(20,8) NOT NULL, + "fee" DECIMAL(20,8) NOT NULL, + "chain_type" VARCHAR(20) NOT NULL, + "to_address" VARCHAR(100) NOT NULL, + "tx_hash" VARCHAR(100), + "is_internal_transfer" BOOLEAN NOT NULL DEFAULT false, + "to_account_sequence" VARCHAR(20), + "to_user_id" BIGINT, + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "error_message" VARCHAR(500), + "frozen_at" TIMESTAMP(3), + "broadcasted_at" TIMESTAMP(3), + "confirmed_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "withdrawal_order_query_view_pkey" PRIMARY KEY ("order_id") +); + +CREATE UNIQUE INDEX "withdrawal_order_query_view_order_no_key" ON "withdrawal_order_query_view"("order_no"); +CREATE INDEX "withdrawal_order_query_view_account_sequence_idx" ON "withdrawal_order_query_view"("account_sequence"); +CREATE INDEX "withdrawal_order_query_view_user_id_idx" ON "withdrawal_order_query_view"("user_id"); +CREATE INDEX "withdrawal_order_query_view_status_idx" ON "withdrawal_order_query_view"("status"); +CREATE INDEX "withdrawal_order_query_view_chain_type_idx" ON "withdrawal_order_query_view"("chain_type"); +CREATE INDEX "withdrawal_order_query_view_tx_hash_idx" ON "withdrawal_order_query_view"("tx_hash"); +CREATE INDEX "withdrawal_order_query_view_created_at_idx" ON "withdrawal_order_query_view"("created_at"); + +-- 法币提现订单查询视图 +CREATE TABLE "fiat_withdrawal_order_query_view" ( + "order_id" BIGINT NOT NULL, + "order_no" VARCHAR(50) NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "user_id" BIGINT NOT NULL, + "amount" DECIMAL(20,8) NOT NULL, + "fee" DECIMAL(20,8) NOT NULL, + "payment_method" VARCHAR(20) NOT NULL, + "bank_name" VARCHAR(100), + "bank_card_no_masked" VARCHAR(50), + "card_holder_name" VARCHAR(100), + "alipay_account_masked" VARCHAR(100), + "wechat_account_masked" VARCHAR(100), + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "error_message" VARCHAR(500), + "reviewed_by" VARCHAR(100), + "reviewed_at" TIMESTAMP(3), + "review_remark" VARCHAR(500), + "paid_by" VARCHAR(100), + "paid_at" TIMESTAMP(3), + "frozen_at" TIMESTAMP(3), + "completed_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "fiat_withdrawal_order_query_view_pkey" PRIMARY KEY ("order_id") +); + +CREATE UNIQUE INDEX "fiat_withdrawal_order_query_view_order_no_key" ON "fiat_withdrawal_order_query_view"("order_no"); +CREATE INDEX "fiat_withdrawal_order_query_view_account_sequence_idx" ON "fiat_withdrawal_order_query_view"("account_sequence"); +CREATE INDEX "fiat_withdrawal_order_query_view_user_id_idx" ON "fiat_withdrawal_order_query_view"("user_id"); +CREATE INDEX "fiat_withdrawal_order_query_view_status_idx" ON "fiat_withdrawal_order_query_view"("status"); +CREATE INDEX "fiat_withdrawal_order_query_view_payment_method_idx" ON "fiat_withdrawal_order_query_view"("payment_method"); +CREATE INDEX "fiat_withdrawal_order_query_view_reviewed_by_idx" ON "fiat_withdrawal_order_query_view"("reviewed_by"); +CREATE INDEX "fiat_withdrawal_order_query_view_created_at_idx" ON "fiat_withdrawal_order_query_view"("created_at"); + +-- ----------------------------------------------------------------------------- +-- Planting Query Views (认种查询视图) +-- ----------------------------------------------------------------------------- + +-- 认种订单查询视图 +CREATE TABLE "planting_order_query_view" ( + "order_id" BIGINT NOT NULL, + "order_no" VARCHAR(50) NOT NULL, + "user_id" BIGINT NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "tree_count" INTEGER NOT NULL, + "total_amount" DECIMAL(20,8) NOT NULL, + "selected_province" VARCHAR(10), + "selected_city" VARCHAR(10), + "status" VARCHAR(30) NOT NULL DEFAULT 'CREATED', + "pool_injection_batch_id" BIGINT, + "created_at" TIMESTAMP(3) NOT NULL, + "paid_at" TIMESTAMP(3), + "fund_allocated_at" TIMESTAMP(3), + "mining_enabled_at" TIMESTAMP(3), + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "planting_order_query_view_pkey" PRIMARY KEY ("order_id") +); + +CREATE UNIQUE INDEX "planting_order_query_view_order_no_key" ON "planting_order_query_view"("order_no"); +CREATE INDEX "planting_order_query_view_user_id_idx" ON "planting_order_query_view"("user_id"); +CREATE INDEX "planting_order_query_view_account_sequence_idx" ON "planting_order_query_view"("account_sequence"); +CREATE INDEX "planting_order_query_view_status_idx" ON "planting_order_query_view"("status"); +CREATE INDEX "planting_order_query_view_province_city_idx" ON "planting_order_query_view"("selected_province", "selected_city"); +CREATE INDEX "planting_order_query_view_created_at_idx" ON "planting_order_query_view"("created_at"); +CREATE INDEX "planting_order_query_view_paid_at_idx" ON "planting_order_query_view"("paid_at"); + +-- 用户持仓查询视图 +CREATE TABLE "planting_position_query_view" ( + "position_id" BIGINT NOT NULL, + "user_id" BIGINT NOT NULL, + "total_tree_count" INTEGER NOT NULL DEFAULT 0, + "effective_tree_count" INTEGER NOT NULL DEFAULT 0, + "pending_tree_count" INTEGER NOT NULL DEFAULT 0, + "first_mining_start_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "planting_position_query_view_pkey" PRIMARY KEY ("position_id") +); + +CREATE UNIQUE INDEX "planting_position_query_view_user_id_key" ON "planting_position_query_view"("user_id"); +CREATE INDEX "planting_position_query_view_total_tree_count_idx" ON "planting_position_query_view"("total_tree_count"); + +-- 合同签署任务查询视图 +CREATE TABLE "contract_signing_task_query_view" ( + "task_id" BIGINT NOT NULL, + "order_no" VARCHAR(50) NOT NULL, + "contract_no" VARCHAR(30) NOT NULL, + "user_id" BIGINT NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "contract_version" VARCHAR(20) NOT NULL, + "tree_count" INTEGER NOT NULL, + "total_amount" DECIMAL(20,8) NOT NULL, + "province_code" VARCHAR(10) NOT NULL, + "province_name" VARCHAR(50) NOT NULL, + "city_code" VARCHAR(10) NOT NULL, + "city_name" VARCHAR(50) NOT NULL, + "status" VARCHAR(30) NOT NULL DEFAULT 'PENDING', + "expires_at" TIMESTAMP(3) NOT NULL, + "signed_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "contract_signing_task_query_view_pkey" PRIMARY KEY ("task_id") +); + +CREATE UNIQUE INDEX "contract_signing_task_query_view_order_no_key" ON "contract_signing_task_query_view"("order_no"); +CREATE UNIQUE INDEX "contract_signing_task_query_view_contract_no_key" ON "contract_signing_task_query_view"("contract_no"); +CREATE INDEX "contract_signing_task_query_view_user_id_idx" ON "contract_signing_task_query_view"("user_id"); +CREATE INDEX "contract_signing_task_query_view_status_idx" ON "contract_signing_task_query_view"("status"); +CREATE INDEX "contract_signing_task_query_view_expires_at_idx" ON "contract_signing_task_query_view"("expires_at"); + +-- ----------------------------------------------------------------------------- +-- Authorization Query Views (授权查询视图) +-- ----------------------------------------------------------------------------- + +-- 授权角色查询视图 +CREATE TABLE "authorization_role_query_view" ( + "id" TEXT NOT NULL, + "user_id" TEXT NOT NULL, + "account_sequence" TEXT NOT NULL, + "role_type" VARCHAR(30) NOT NULL, + "region_code" TEXT NOT NULL, + "region_name" TEXT NOT NULL, + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "display_title" TEXT NOT NULL, + "authorized_at" TIMESTAMP(3), + "authorized_by" TEXT, + "revoked_at" TIMESTAMP(3), + "revoked_by" TEXT, + "deleted_at" TIMESTAMP(3), + "initial_target_tree_count" INTEGER NOT NULL, + "monthly_target_type" VARCHAR(20) NOT NULL, + "benefit_active" BOOLEAN NOT NULL DEFAULT false, + "benefit_activated_at" TIMESTAMP(3), + "last_assessment_month" TEXT, + "monthly_trees_added" INTEGER NOT NULL DEFAULT 0, + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "authorization_role_query_view_pkey" PRIMARY KEY ("id") +); + +CREATE INDEX "authorization_role_query_view_account_sequence_idx" ON "authorization_role_query_view"("account_sequence"); +CREATE INDEX "authorization_role_query_view_user_id_idx" ON "authorization_role_query_view"("user_id"); +CREATE INDEX "authorization_role_query_view_role_type_region_code_idx" ON "authorization_role_query_view"("role_type", "region_code"); +CREATE INDEX "authorization_role_query_view_status_idx" ON "authorization_role_query_view"("status"); +CREATE INDEX "authorization_role_query_view_deleted_at_idx" ON "authorization_role_query_view"("deleted_at"); + +-- 月度考核查询视图 +CREATE TABLE "monthly_assessment_query_view" ( + "id" TEXT NOT NULL, + "authorization_id" TEXT NOT NULL, + "user_id" TEXT NOT NULL, + "account_sequence" TEXT NOT NULL, + "role_type" VARCHAR(30) NOT NULL, + "region_code" TEXT NOT NULL, + "assessment_month" TEXT NOT NULL, + "month_index" INTEGER NOT NULL, + "monthly_target" INTEGER NOT NULL, + "cumulative_target" INTEGER NOT NULL, + "monthly_completed" INTEGER NOT NULL DEFAULT 0, + "cumulative_completed" INTEGER NOT NULL DEFAULT 0, + "completed_at" TIMESTAMP(3), + "result" VARCHAR(20) NOT NULL DEFAULT 'NOT_ASSESSED', + "ranking_in_region" INTEGER, + "is_first_place" BOOLEAN NOT NULL DEFAULT false, + "is_bypassed" BOOLEAN NOT NULL DEFAULT false, + "assessed_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "monthly_assessment_query_view_pkey" PRIMARY KEY ("id") +); + +CREATE INDEX "monthly_assessment_query_view_account_month_idx" ON "monthly_assessment_query_view"("account_sequence", "assessment_month"); +CREATE INDEX "monthly_assessment_query_view_user_month_idx" ON "monthly_assessment_query_view"("user_id", "assessment_month"); +CREATE INDEX "monthly_assessment_query_view_role_region_month_idx" ON "monthly_assessment_query_view"("role_type", "region_code", "assessment_month"); +CREATE INDEX "monthly_assessment_query_view_month_result_idx" ON "monthly_assessment_query_view"("assessment_month", "result"); + +-- 系统账户查询视图 +CREATE TABLE "system_account_query_view" ( + "account_id" BIGINT NOT NULL, + "account_type" VARCHAR(30) NOT NULL, + "region_code" VARCHAR(10), + "region_name" VARCHAR(50), + "wallet_address" VARCHAR(42), + "usdt_balance" DECIMAL(20,8) NOT NULL DEFAULT 0, + "hashpower" DECIMAL(20,8) NOT NULL DEFAULT 0, + "total_received" DECIMAL(20,8) NOT NULL DEFAULT 0, + "total_transferred" DECIMAL(20,8) NOT NULL DEFAULT 0, + "status" VARCHAR(20) NOT NULL DEFAULT 'ACTIVE', + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "system_account_query_view_pkey" PRIMARY KEY ("account_id") +); + +CREATE INDEX "system_account_query_view_account_type_idx" ON "system_account_query_view"("account_type"); +CREATE INDEX "system_account_query_view_wallet_address_idx" ON "system_account_query_view"("wallet_address"); + +-- ----------------------------------------------------------------------------- +-- Ledger Views (分类账视图) +-- ----------------------------------------------------------------------------- + +-- 钱包流水分类账 +CREATE TABLE "wallet_ledger_entry_view" ( + "entry_id" BIGINT NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "user_id" BIGINT NOT NULL, + "entry_type" VARCHAR(50) NOT NULL, + "amount" DECIMAL(20,8) NOT NULL, + "asset_type" VARCHAR(20) NOT NULL, + "balance_after" DECIMAL(20,8), + "ref_order_id" VARCHAR(100), + "ref_tx_hash" VARCHAR(100), + "memo" TEXT, + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "wallet_ledger_entry_view_pkey" PRIMARY KEY ("entry_id") +); + +CREATE INDEX "wallet_ledger_entry_view_account_created_idx" ON "wallet_ledger_entry_view"("account_sequence", "created_at" DESC); +CREATE INDEX "wallet_ledger_entry_view_user_created_idx" ON "wallet_ledger_entry_view"("user_id", "created_at" DESC); +CREATE INDEX "wallet_ledger_entry_view_entry_type_idx" ON "wallet_ledger_entry_view"("entry_type"); +CREATE INDEX "wallet_ledger_entry_view_asset_type_idx" ON "wallet_ledger_entry_view"("asset_type"); +CREATE INDEX "wallet_ledger_entry_view_ref_order_id_idx" ON "wallet_ledger_entry_view"("ref_order_id"); +CREATE INDEX "wallet_ledger_entry_view_ref_tx_hash_idx" ON "wallet_ledger_entry_view"("ref_tx_hash"); +CREATE INDEX "wallet_ledger_entry_view_created_at_idx" ON "wallet_ledger_entry_view"("created_at"); + +-- 认种资金分配分类账 +CREATE TABLE "fund_allocation_view" ( + "allocation_id" BIGINT NOT NULL, + "order_id" BIGINT NOT NULL, + "target_type" VARCHAR(50) NOT NULL, + "amount" DECIMAL(20,8) NOT NULL, + "target_account_id" VARCHAR(100), + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "fund_allocation_view_pkey" PRIMARY KEY ("allocation_id") +); + +CREATE INDEX "fund_allocation_view_order_id_idx" ON "fund_allocation_view"("order_id"); +CREATE INDEX "fund_allocation_view_target_idx" ON "fund_allocation_view"("target_type", "target_account_id"); +CREATE INDEX "fund_allocation_view_created_at_idx" ON "fund_allocation_view"("created_at"); + +-- 系统账户流水分类账 +CREATE TABLE "system_account_ledger_view" ( + "ledger_id" BIGINT NOT NULL, + "account_id" BIGINT NOT NULL, + "entry_type" VARCHAR(30) NOT NULL, + "amount" DECIMAL(20,8) NOT NULL, + "balance_after" DECIMAL(20,8) NOT NULL, + "source_order_id" BIGINT, + "source_reward_id" BIGINT, + "tx_hash" VARCHAR(66), + "memo" VARCHAR(500), + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "system_account_ledger_view_pkey" PRIMARY KEY ("ledger_id") +); + +CREATE INDEX "system_account_ledger_view_account_created_idx" ON "system_account_ledger_view"("account_id", "created_at" DESC); +CREATE INDEX "system_account_ledger_view_source_order_idx" ON "system_account_ledger_view"("source_order_id"); +CREATE INDEX "system_account_ledger_view_tx_hash_idx" ON "system_account_ledger_view"("tx_hash"); +CREATE INDEX "system_account_ledger_view_created_at_idx" ON "system_account_ledger_view"("created_at"); diff --git a/backend/services/admin-service/prisma/schema.prisma b/backend/services/admin-service/prisma/schema.prisma index 4f995e35..927e6295 100644 --- a/backend/services/admin-service/prisma/schema.prisma +++ b/backend/services/admin-service/prisma/schema.prisma @@ -68,7 +68,7 @@ model Notification { // 关联 readRecords NotificationRead[] - targetTags NotificationTagTarget[] // BY_TAG 时使用 + targetTags NotificationTagTarget[] // BY_TAG 时使用 targetUsers NotificationUserTarget[] // SPECIFIC 时使用 @@index([isEnabled, publishedAt]) @@ -110,8 +110,8 @@ enum NotificationPriority { /// 目标用户类型 enum TargetType { - ALL // 所有用户 - BY_TAG // 按标签匹配 + ALL // 所有用户 + BY_TAG // 按标签匹配 SPECIFIC // 指定用户列表 } @@ -180,28 +180,28 @@ model TagCategory { /// 标签类型 enum TagType { - MANUAL // 手动打标 (管理员操作) - AUTO // 自动打标 (规则驱动) + MANUAL // 手动打标 (管理员操作) + AUTO // 自动打标 (规则驱动) COMPUTED // 计算型 (实时计算,不存储关联) - SYSTEM // 系统内置 (不可删除) + SYSTEM // 系统内置 (不可删除) } /// 标签值类型 enum TagValueType { BOOLEAN // 布尔型: 有/无 - ENUM // 枚举型: 高/中/低 - NUMBER // 数值型: 0-100分 - STRING // 字符串型 + ENUM // 枚举型: 高/中/低 + NUMBER // 数值型: 0-100分 + STRING // 字符串型 } /// 用户标签定义 model UserTag { - id String @id @default(uuid()) - categoryId String? @map("category_id") - code String @unique @db.VarChar(50) // "vip", "new_user", "whale" - name String @db.VarChar(100) // "VIP用户", "新用户", "大客户" - description String? @db.Text - color String? @db.VarChar(20) // "#FF5722" + id String @id @default(uuid()) + categoryId String? @map("category_id") + code String @unique @db.VarChar(50) // "vip", "new_user", "whale" + name String @db.VarChar(100) // "VIP用户", "新用户", "大客户" + description String? @db.Text + color String? @db.VarChar(20) // "#FF5722" type TagType @default(MANUAL) // 标签类型 valueType TagValueType @default(BOOLEAN) @map("value_type") // 标签值类型 @@ -211,7 +211,7 @@ model UserTag { enumValues Json? @map("enum_values") // 关联的自动规则 (type=AUTO 时使用) - ruleId String? @unique @map("rule_id") + ruleId String? @unique @map("rule_id") rule UserClassificationRule? @relation(fields: [ruleId], references: [id], onDelete: SetNull) // 广告相关 @@ -242,9 +242,9 @@ model UserTag { /// 用户-标签关联 model UserTagAssignment { - id String @id @default(uuid()) - accountSequence String @map("account_sequence") @db.VarChar(12) - tagId String @map("tag_id") + id String @id @default(uuid()) + accountSequence String @map("account_sequence") @db.VarChar(12) + tagId String @map("tag_id") // 标签值 (根据 valueType) // BOOLEAN: null (存在即为true) @@ -344,16 +344,16 @@ model UserFeature { /// 人群包用途 enum SegmentUsageType { - GENERAL // 通用 + GENERAL // 通用 NOTIFICATION // 通知定向 - ADVERTISING // 广告定向 - ANALYTICS // 数据分析 + ADVERTISING // 广告定向 + ANALYTICS // 数据分析 } /// 人群包 - 多条件组合的用户群 model AudienceSegment { - id String @id @default(uuid()) - name String @db.VarChar(100) // "高价值活跃用户" + id String @id @default(uuid()) + name String @db.VarChar(100) // "高价值活跃用户" description String? @db.Text // 定向条件 (JSON) @@ -476,29 +476,29 @@ model UserQueryView { /// 推荐关系查询视图 - 通过 Debezium CDC 从 referral-service 同步 /// 用于 admin-web 查看用户推荐关系,避免跨服务调用 model ReferralQueryView { - id BigInt @id @map("relationship_id") - userId BigInt @unique @map("user_id") - accountSequence String @unique @map("account_sequence") @db.VarChar(12) + id BigInt @id @map("relationship_id") + userId BigInt @unique @map("user_id") + accountSequence String @unique @map("account_sequence") @db.VarChar(12) // 推荐人信息 - referrerId BigInt? @map("referrer_id") - rootUserId BigInt? @map("root_user_id") + referrerId BigInt? @map("referrer_id") + rootUserId BigInt? @map("root_user_id") // 推荐码 - myReferralCode String @unique @map("my_referral_code") @db.VarChar(20) - usedReferralCode String? @map("used_referral_code") @db.VarChar(20) + myReferralCode String @unique @map("my_referral_code") @db.VarChar(20) + usedReferralCode String? @map("used_referral_code") @db.VarChar(20) // 推荐链信息 - ancestorPath BigInt[] @map("ancestor_path") - depth Int @default(0) @map("depth") + ancestorPath BigInt[] @map("ancestor_path") + depth Int @default(0) @map("depth") // 直推统计 - directReferralCount Int @default(0) @map("direct_referral_count") - activeDirectCount Int @default(0) @map("active_direct_count") + directReferralCount Int @default(0) @map("direct_referral_count") + activeDirectCount Int @default(0) @map("active_direct_count") // 时间戳 - createdAt DateTime @map("created_at") - syncedAt DateTime @default(now()) @map("synced_at") + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") @@index([referrerId]) @@index([accountSequence]) @@ -509,6 +509,474 @@ model ReferralQueryView { @@map("referral_query_view") } +// ============================================================================= +// Wallet Query Views (钱包查询视图 - 通过 CDC 同步) +// ============================================================================= + +/// 钱包账户查询视图 - 通过 Debezium CDC 从 wallet-service 同步 +model WalletAccountQueryView { + id BigInt @id @map("wallet_id") + accountSequence String @unique @map("account_sequence") @db.VarChar(20) + userId BigInt @unique @map("user_id") + + // USDT 余额 + usdtAvailable Decimal @default(0) @map("usdt_available") @db.Decimal(20, 8) + usdtFrozen Decimal @default(0) @map("usdt_frozen") @db.Decimal(20, 8) + + // DST 余额 + dstAvailable Decimal @default(0) @map("dst_available") @db.Decimal(20, 8) + dstFrozen Decimal @default(0) @map("dst_frozen") @db.Decimal(20, 8) + + // BNB 余额 + bnbAvailable Decimal @default(0) @map("bnb_available") @db.Decimal(20, 8) + bnbFrozen Decimal @default(0) @map("bnb_frozen") @db.Decimal(20, 8) + + // OG 余额 + ogAvailable Decimal @default(0) @map("og_available") @db.Decimal(20, 8) + ogFrozen Decimal @default(0) @map("og_frozen") @db.Decimal(20, 8) + + // RWAD 余额 + rwadAvailable Decimal @default(0) @map("rwad_available") @db.Decimal(20, 8) + rwadFrozen Decimal @default(0) @map("rwad_frozen") @db.Decimal(20, 8) + + // 算力 + hashpower Decimal @default(0) @map("hashpower") @db.Decimal(20, 8) + + // 待领取/可结算/已结算收益 + pendingUsdt Decimal @default(0) @map("pending_usdt") @db.Decimal(20, 8) + pendingHashpower Decimal @default(0) @map("pending_hashpower") @db.Decimal(20, 8) + settleableUsdt Decimal @default(0) @map("settleable_usdt") @db.Decimal(20, 8) + settleableHashpower Decimal @default(0) @map("settleable_hashpower") @db.Decimal(20, 8) + settledTotalUsdt Decimal @default(0) @map("settled_total_usdt") @db.Decimal(20, 8) + settledTotalHashpower Decimal @default(0) @map("settled_total_hashpower") @db.Decimal(20, 8) + expiredTotalUsdt Decimal @default(0) @map("expired_total_usdt") @db.Decimal(20, 8) + expiredTotalHashpower Decimal @default(0) @map("expired_total_hashpower") @db.Decimal(20, 8) + + // 状态 + status String @default("ACTIVE") @map("status") @db.VarChar(20) + hasPlanted Boolean @default(false) @map("has_planted") + + // 时间戳 + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([accountSequence]) + @@index([usdtAvailable(sort: Desc)]) + @@index([hashpower(sort: Desc)]) + @@index([status]) + @@map("wallet_account_query_view") +} + +/// 提现订单查询视图 - 通过 Debezium CDC 从 wallet-service 同步 +model WithdrawalOrderQueryView { + id BigInt @id @map("order_id") + orderNo String @unique @map("order_no") @db.VarChar(50) + accountSequence String @map("account_sequence") @db.VarChar(20) + userId BigInt @map("user_id") + + // 提现信息 + amount Decimal @map("amount") @db.Decimal(20, 8) + fee Decimal @map("fee") @db.Decimal(20, 8) + chainType String @map("chain_type") @db.VarChar(20) + toAddress String @map("to_address") @db.VarChar(100) + + // 交易信息 + txHash String? @map("tx_hash") @db.VarChar(100) + + // 内部转账标识 + isInternalTransfer Boolean @default(false) @map("is_internal_transfer") + toAccountSequence String? @map("to_account_sequence") @db.VarChar(20) + toUserId BigInt? @map("to_user_id") + + // 状态 + status String @default("PENDING") @map("status") @db.VarChar(20) + errorMessage String? @map("error_message") @db.VarChar(500) + + // 时间戳 + frozenAt DateTime? @map("frozen_at") + broadcastedAt DateTime? @map("broadcasted_at") + confirmedAt DateTime? @map("confirmed_at") + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([accountSequence]) + @@index([userId]) + @@index([status]) + @@index([chainType]) + @@index([txHash]) + @@index([createdAt]) + @@map("withdrawal_order_query_view") +} + +/// 法币提现订单查询视图 - 通过 Debezium CDC 从 wallet-service 同步 +model FiatWithdrawalOrderQueryView { + id BigInt @id @map("order_id") + orderNo String @unique @map("order_no") @db.VarChar(50) + accountSequence String @map("account_sequence") @db.VarChar(20) + userId BigInt @map("user_id") + + // 金额 + amount Decimal @map("amount") @db.Decimal(20, 8) + fee Decimal @map("fee") @db.Decimal(20, 8) + + // 收款方式 + paymentMethod String @map("payment_method") @db.VarChar(20) + + // 银行卡信息 (脱敏) + bankName String? @map("bank_name") @db.VarChar(100) + bankCardNoMasked String? @map("bank_card_no_masked") @db.VarChar(50) + cardHolderName String? @map("card_holder_name") @db.VarChar(100) + + // 支付宝/微信 (脱敏) + alipayAccountMasked String? @map("alipay_account_masked") @db.VarChar(100) + wechatAccountMasked String? @map("wechat_account_masked") @db.VarChar(100) + + // 状态 + status String @default("PENDING") @map("status") @db.VarChar(20) + errorMessage String? @map("error_message") @db.VarChar(500) + + // 审核信息 + reviewedBy String? @map("reviewed_by") @db.VarChar(100) + reviewedAt DateTime? @map("reviewed_at") + reviewRemark String? @map("review_remark") @db.VarChar(500) + + // 打款信息 + paidBy String? @map("paid_by") @db.VarChar(100) + paidAt DateTime? @map("paid_at") + + // 时间戳 + frozenAt DateTime? @map("frozen_at") + completedAt DateTime? @map("completed_at") + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([accountSequence]) + @@index([userId]) + @@index([status]) + @@index([paymentMethod]) + @@index([reviewedBy]) + @@index([createdAt]) + @@map("fiat_withdrawal_order_query_view") +} + +// ============================================================================= +// Planting Query Views (认种查询视图 - 通过 CDC 同步) +// ============================================================================= + +/// 认种订单查询视图 - 通过 Debezium CDC 从 planting-service 同步 +model PlantingOrderQueryView { + id BigInt @id @map("order_id") + orderNo String @unique @map("order_no") @db.VarChar(50) + userId BigInt @map("user_id") + accountSequence String @map("account_sequence") @db.VarChar(20) + + // 认种信息 + treeCount Int @map("tree_count") + totalAmount Decimal @map("total_amount") @db.Decimal(20, 8) + + // 省市选择 + selectedProvince String? @map("selected_province") @db.VarChar(10) + selectedCity String? @map("selected_city") @db.VarChar(10) + + // 订单状态 + status String @default("CREATED") @map("status") @db.VarChar(30) + + // 底池信息 + poolInjectionBatchId BigInt? @map("pool_injection_batch_id") + + // 时间戳 + createdAt DateTime @map("created_at") + paidAt DateTime? @map("paid_at") + fundAllocatedAt DateTime? @map("fund_allocated_at") + miningEnabledAt DateTime? @map("mining_enabled_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([userId]) + @@index([accountSequence]) + @@index([status]) + @@index([selectedProvince, selectedCity]) + @@index([createdAt]) + @@index([paidAt]) + @@map("planting_order_query_view") +} + +/// 用户持仓查询视图 - 通过 Debezium CDC 从 planting-service 同步 +model PlantingPositionQueryView { + id BigInt @id @map("position_id") + userId BigInt @unique @map("user_id") + + // 持仓统计 + totalTreeCount Int @default(0) @map("total_tree_count") + effectiveTreeCount Int @default(0) @map("effective_tree_count") + pendingTreeCount Int @default(0) @map("pending_tree_count") + + // 挖矿状态 + firstMiningStartAt DateTime? @map("first_mining_start_at") + + // 时间戳 + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([totalTreeCount]) + @@map("planting_position_query_view") +} + +/// 合同签署任务查询视图 - 通过 Debezium CDC 从 planting-service 同步 +model ContractSigningTaskQueryView { + id BigInt @id @map("task_id") + + // 关联信息 + orderNo String @unique @map("order_no") @db.VarChar(50) + contractNo String @unique @map("contract_no") @db.VarChar(30) + userId BigInt @map("user_id") + accountSequence String @map("account_sequence") @db.VarChar(20) + + // 合同信息 + contractVersion String @map("contract_version") @db.VarChar(20) + + // 订单信息快照 + treeCount Int @map("tree_count") + totalAmount Decimal @map("total_amount") @db.Decimal(20, 8) + provinceCode String @map("province_code") @db.VarChar(10) + provinceName String @map("province_name") @db.VarChar(50) + cityCode String @map("city_code") @db.VarChar(10) + cityName String @map("city_name") @db.VarChar(50) + + // 签署状态 + status String @default("PENDING") @map("status") @db.VarChar(30) + expiresAt DateTime @map("expires_at") + + // 签署时间戳 + signedAt DateTime? @map("signed_at") + + // 时间戳 + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([userId]) + @@index([status]) + @@index([expiresAt]) + @@map("contract_signing_task_query_view") +} + +// ============================================================================= +// Authorization Query Views (授权查询视图 - 通过 CDC 同步) +// ============================================================================= + +/// 授权角色查询视图 - 通过 Debezium CDC 从 authorization-service 同步 +model AuthorizationRoleQueryView { + id String @id + userId String @map("user_id") + accountSequence String @map("account_sequence") + roleType String @map("role_type") @db.VarChar(30) + regionCode String @map("region_code") + regionName String @map("region_name") + status String @default("PENDING") @db.VarChar(20) + displayTitle String @map("display_title") + + // 授权信息 + authorizedAt DateTime? @map("authorized_at") + authorizedBy String? @map("authorized_by") + revokedAt DateTime? @map("revoked_at") + revokedBy String? @map("revoked_by") + + // 软删除 + deletedAt DateTime? @map("deleted_at") + + // 考核配置 + initialTargetTreeCount Int @map("initial_target_tree_count") + monthlyTargetType String @map("monthly_target_type") @db.VarChar(20) + + // 权益状态 + benefitActive Boolean @default(false) @map("benefit_active") + benefitActivatedAt DateTime? @map("benefit_activated_at") + + // 月度考核追踪 + lastAssessmentMonth String? @map("last_assessment_month") + monthlyTreesAdded Int @default(0) @map("monthly_trees_added") + + // 时间戳 + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([accountSequence]) + @@index([userId]) + @@index([roleType, regionCode]) + @@index([status]) + @@index([deletedAt]) + @@map("authorization_role_query_view") +} + +/// 月度考核查询视图 - 通过 Debezium CDC 从 authorization-service 同步 +model MonthlyAssessmentQueryView { + id String @id + authorizationId String @map("authorization_id") + userId String @map("user_id") + accountSequence String @map("account_sequence") + roleType String @map("role_type") @db.VarChar(30) + regionCode String @map("region_code") + + // 考核月份 + assessmentMonth String @map("assessment_month") + monthIndex Int @map("month_index") + + // 考核目标 + monthlyTarget Int @map("monthly_target") + cumulativeTarget Int @map("cumulative_target") + + // 完成情况 + monthlyCompleted Int @default(0) @map("monthly_completed") + cumulativeCompleted Int @default(0) @map("cumulative_completed") + completedAt DateTime? @map("completed_at") + + // 考核结果 + result String @default("NOT_ASSESSED") @db.VarChar(20) + + // 排名 + rankingInRegion Int? @map("ranking_in_region") + isFirstPlace Boolean @default(false) @map("is_first_place") + + // 豁免 + isBypassed Boolean @default(false) @map("is_bypassed") + + // 时间戳 + assessedAt DateTime? @map("assessed_at") + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([accountSequence, assessmentMonth]) + @@index([userId, assessmentMonth]) + @@index([roleType, regionCode, assessmentMonth]) + @@index([assessmentMonth, result]) + @@map("monthly_assessment_query_view") +} + +/// 系统账户查询视图 - 通过 Debezium CDC 从 authorization-service 同步 +model SystemAccountQueryView { + id BigInt @id @map("account_id") + accountType String @map("account_type") @db.VarChar(30) + + // 区域信息 + regionCode String? @map("region_code") @db.VarChar(10) + regionName String? @map("region_name") @db.VarChar(50) + + // 钱包地址 + walletAddress String? @map("wallet_address") @db.VarChar(42) + + // 余额 + usdtBalance Decimal @default(0) @map("usdt_balance") @db.Decimal(20, 8) + hashpower Decimal @default(0) @map("hashpower") @db.Decimal(20, 8) + + // 累计统计 + totalReceived Decimal @default(0) @map("total_received") @db.Decimal(20, 8) + totalTransferred Decimal @default(0) @map("total_transferred") @db.Decimal(20, 8) + + // 状态 + status String @default("ACTIVE") @map("status") @db.VarChar(20) + + // 时间戳 + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([accountType]) + @@index([walletAddress]) + @@map("system_account_query_view") +} + +// ============================================================================= +// Ledger Views (分类账视图 - 通过 CDC 同步) +// ============================================================================= + +/// 钱包流水分类账 - 通过 Debezium CDC 从 wallet-service 同步 +/// 记录所有用户钱包的资金变动(充值/提现/收益/扣款等) +model WalletLedgerEntryView { + id BigInt @id @map("entry_id") + accountSequence String @map("account_sequence") @db.VarChar(20) + userId BigInt @map("user_id") + + // 流水类型 + entryType String @map("entry_type") @db.VarChar(50) + + // 金额变动 (正数入账, 负数支出) + amount Decimal @map("amount") @db.Decimal(20, 8) + assetType String @map("asset_type") @db.VarChar(20) + + // 余额快照 (操作后余额) + balanceAfter Decimal? @map("balance_after") @db.Decimal(20, 8) + + // 关联引用 + refOrderId String? @map("ref_order_id") @db.VarChar(100) + refTxHash String? @map("ref_tx_hash") @db.VarChar(100) + + // 备注 + memo String? @map("memo") @db.Text + + // 时间戳 + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([accountSequence, createdAt(sort: Desc)]) + @@index([userId, createdAt(sort: Desc)]) + @@index([entryType]) + @@index([assetType]) + @@index([refOrderId]) + @@index([refTxHash]) + @@index([createdAt]) + @@map("wallet_ledger_entry_view") +} + +/// 认种资金分配分类账 - 通过 Debezium CDC 从 planting-service 同步 +/// 记录每笔认种订单的资金分配明细 +model FundAllocationView { + id BigInt @id @map("allocation_id") + orderId BigInt @map("order_id") + + // 分配信息 + targetType String @map("target_type") @db.VarChar(50) + amount Decimal @map("amount") @db.Decimal(20, 8) + targetAccountId String? @map("target_account_id") @db.VarChar(100) + + // 时间戳 + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([orderId]) + @@index([targetType, targetAccountId]) + @@index([createdAt]) + @@map("fund_allocation_view") +} + +/// 系统账户流水分类账 - 通过 Debezium CDC 从 authorization-service 同步 +/// 记录系统账户(成本账户、运营账户、区域账户)的资金变动 +model SystemAccountLedgerView { + id BigInt @id @map("ledger_id") + accountId BigInt @map("account_id") + + // 流水类型 + entryType String @map("entry_type") @db.VarChar(30) + + // 金额 + amount Decimal @map("amount") @db.Decimal(20, 8) + balanceAfter Decimal @map("balance_after") @db.Decimal(20, 8) + + // 关联信息 + sourceOrderId BigInt? @map("source_order_id") + sourceRewardId BigInt? @map("source_reward_id") + txHash String? @map("tx_hash") @db.VarChar(66) + + memo String? @map("memo") @db.VarChar(500) + + // 时间戳 + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([accountId, createdAt(sort: Desc)]) + @@index([sourceOrderId]) + @@index([txHash]) + @@index([createdAt]) + @@map("system_account_ledger_view") +} + // ============================================================================= // Kafka Event Tracking (事件消费追踪) // ============================================================================= @@ -562,16 +1030,16 @@ model SystemConfig { /// 系统维护公告 - 用于系统升级/维护期间阻断用户操作 model SystemMaintenance { - id String @id @default(uuid()) - title String @db.VarChar(100) // 标题:如"系统升级中" - message String @db.Text // 说明:如"预计10:00恢复,请稍候" - startTime DateTime @map("start_time") // 维护开始时间 - endTime DateTime @map("end_time") // 维护结束时间 - isActive Boolean @default(false) @map("is_active") // 是否激活 - createdAt DateTime @default(now()) @map("created_at") - updatedAt DateTime @updatedAt @map("updated_at") - createdBy String @map("created_by") // 创建人ID - updatedBy String? @map("updated_by") // 更新人ID + id String @id @default(uuid()) + title String @db.VarChar(100) // 标题:如"系统升级中" + message String @db.Text // 说明:如"预计10:00恢复,请稍候" + startTime DateTime @map("start_time") // 维护开始时间 + endTime DateTime @map("end_time") // 维护结束时间 + isActive Boolean @default(false) @map("is_active") // 是否激活 + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + createdBy String @map("created_by") // 创建人ID + updatedBy String? @map("updated_by") // 更新人ID @@index([isActive]) @@index([startTime, endTime]) @@ -584,12 +1052,12 @@ model SystemMaintenance { /// 共管钱包会话状态 enum WalletSessionStatus { - WAITING // 等待参与方加入 - READY // 所有参与方已就绪 + WAITING // 等待参与方加入 + READY // 所有参与方已就绪 PROCESSING // 密钥生成中 - COMPLETED // 创建完成 - FAILED // 创建失败 - CANCELLED // 已取消 + COMPLETED // 创建完成 + FAILED // 创建失败 + CANCELLED // 已取消 } /// 共管钱包会话 - 钱包创建过程的会话记录 diff --git a/backend/services/admin-service/src/infrastructure/kafka/authorization-cdc-consumer.service.ts b/backend/services/admin-service/src/infrastructure/kafka/authorization-cdc-consumer.service.ts new file mode 100644 index 00000000..e4486b19 --- /dev/null +++ b/backend/services/admin-service/src/infrastructure/kafka/authorization-cdc-consumer.service.ts @@ -0,0 +1,655 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; +import { PrismaService } from '../persistence/prisma/prisma.service'; +import { Decimal } from '@prisma/client/runtime/library'; + +/** + * Debezium CDC 事件结构 (经过 ExtractNewRecordState 转换后) + */ + +// authorization_roles 表 +interface CdcAuthorizationRolePayload { + id: string; + user_id: string; + account_sequence: string; + role_type: string; + region_code: string; + region_name: string; + status: string; + display_title: string; + authorized_at?: string | null; + authorized_by?: string | null; + revoked_at?: string | null; + revoked_by?: string | null; + deleted_at?: string | null; + initial_target_tree_count: number; + monthly_target_type: string; + benefit_active: boolean; + benefit_activated_at?: string | null; + last_assessment_month?: string | null; + monthly_trees_added: number; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +// monthly_assessments 表 +interface CdcMonthlyAssessmentPayload { + id: string; + authorization_id: string; + user_id: string; + account_sequence: string; + role_type: string; + region_code: string; + assessment_month: string; + month_index: number; + monthly_target: number; + cumulative_target: number; + monthly_completed: number; + cumulative_completed: number; + completed_at?: string | null; + result: string; + ranking_in_region?: number | null; + is_first_place: boolean; + is_bypassed: boolean; + assessed_at?: string | null; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +// system_account_ledgers 表 (系统账户分类账流水) +interface CdcSystemAccountLedgerPayload { + ledger_id: string; + account_id: string; + entry_type: string; + amount: string; + balance_after: string; + source_order_id?: string | null; + source_reward_id?: string | null; + tx_hash?: string | null; + memo?: string | null; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +// system_accounts 表 +interface CdcSystemAccountPayload { + account_id: string; + account_type: string; + region_code?: string | null; + region_name?: string | null; + wallet_address?: string | null; + usdt_balance: string; + hashpower: string; + total_received: string; + total_transferred: string; + status: string; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +type CdcAuthorizationPayload = CdcAuthorizationRolePayload | CdcMonthlyAssessmentPayload | CdcSystemAccountLedgerPayload | CdcSystemAccountPayload; + +/** + * Authorization CDC 消费者服务 + * + * 消费 Debezium 从 authorization-service PostgreSQL 捕获的数据变更 + * + * Topics: + * - cdc.authorization.public.authorization_roles + * - cdc.authorization.public.monthly_assessments + * - cdc.authorization.public.system_accounts + */ +@Injectable() +export class AuthorizationCdcConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(AuthorizationCdcConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isRunning = false; + + // CDC Topics + private readonly cdcTopics = [ + 'cdc.authorization.public.authorization_roles', + 'cdc.authorization.public.monthly_assessments', + 'cdc.authorization.public.system_accounts', + 'cdc.authorization.public.system_account_ledgers', + ]; + private readonly consumerGroup: string; + + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, + ) { + const brokers = (this.configService.get('KAFKA_BROKERS', 'localhost:9092')).split(','); + const clientId = this.configService.get('KAFKA_CLIENT_ID', 'admin-service'); + this.consumerGroup = this.configService.get('KAFKA_AUTHORIZATION_CDC_GROUP', 'admin-service-authorization-cdc'); + + this.kafka = new Kafka({ + clientId: `${clientId}-authorization-cdc`, + brokers, + logLevel: logLevel.WARN, + }); + + this.consumer = this.kafka.consumer({ groupId: this.consumerGroup }); + + this.logger.log(`[Authorization-CDC] Configured to consume topics: ${this.cdcTopics.join(', ')}`); + } + + async onModuleInit() { + await this.start(); + } + + async onModuleDestroy() { + await this.stop(); + } + + async start(): Promise { + if (this.isRunning) { + this.logger.warn('[Authorization-CDC] Consumer already running'); + return; + } + + try { + this.logger.log('[Authorization-CDC] Connecting to Kafka...'); + await this.consumer.connect(); + + for (const topic of this.cdcTopics) { + await this.consumer.subscribe({ + topic, + fromBeginning: false, + }); + } + + this.logger.log(`[Authorization-CDC] Subscribed to topics: ${this.cdcTopics.join(', ')}`); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.isRunning = true; + this.logger.log('[Authorization-CDC] Consumer started successfully'); + } catch (error) { + this.logger.error('[Authorization-CDC] Failed to start consumer:', error); + } + } + + async stop(): Promise { + if (!this.isRunning) return; + + try { + await this.consumer.disconnect(); + this.isRunning = false; + this.logger.log('[Authorization-CDC] Consumer stopped'); + } catch (error) { + this.logger.error('[Authorization-CDC] Failed to stop consumer:', error); + } + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { topic, partition, message } = payload; + + if (!message.value) { + this.logger.warn(`[Authorization-CDC] Empty message from ${topic}:${partition}`); + return; + } + + try { + const data = JSON.parse(message.value.toString()) as CdcAuthorizationPayload; + const operation = data.__op; + const table = data.__table; + + this.logger.debug( + `[Authorization-CDC] Received ${operation} event for table ${table}` + ); + + // 幂等性检查 + const eventId = `authorization-cdc:${topic}:${partition}:${message.offset}`; + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`[Authorization-CDC] Event ${eventId} already processed, skipping`); + return; + } + + // 根据表名处理不同的事件 + switch (table) { + case 'authorization_roles': + await this.processAuthorizationRoleEvent(data as CdcAuthorizationRolePayload); + break; + case 'monthly_assessments': + await this.processMonthlyAssessmentEvent(data as CdcMonthlyAssessmentPayload); + break; + case 'system_accounts': + await this.processSystemAccountEvent(data as CdcSystemAccountPayload); + break; + case 'system_account_ledgers': + await this.processSystemAccountLedgerEvent(data as CdcSystemAccountLedgerPayload); + break; + default: + this.logger.warn(`[Authorization-CDC] Unknown table: ${table}`); + } + + // 记录已处理 + await this.markEventProcessed(eventId, `authorization-cdc:${table}:${operation}`); + + this.logger.log( + `[Authorization-CDC] ✓ Processed ${operation} for table: ${table}` + ); + } catch (error) { + this.logger.error(`[Authorization-CDC] Failed to process message:`, error); + throw error; // 让 KafkaJS 重试 + } + } + + // ==================== authorization_roles 处理 ==================== + + private async processAuthorizationRoleEvent(data: CdcAuthorizationRolePayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handleAuthorizationRoleDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleAuthorizationRoleCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handleAuthorizationRoleUpdate(data); + } + } + + private async handleAuthorizationRoleCreateOrSnapshot(data: CdcAuthorizationRolePayload): Promise { + await this.prisma.authorizationRoleQueryView.upsert({ + where: { id: data.id }, + create: { + id: data.id, + userId: data.user_id, + accountSequence: data.account_sequence, + roleType: data.role_type, + regionCode: data.region_code, + regionName: data.region_name, + status: data.status, + displayTitle: data.display_title, + authorizedAt: data.authorized_at ? new Date(data.authorized_at) : null, + authorizedBy: data.authorized_by || null, + revokedAt: data.revoked_at ? new Date(data.revoked_at) : null, + revokedBy: data.revoked_by || null, + deletedAt: data.deleted_at ? new Date(data.deleted_at) : null, + initialTargetTreeCount: data.initial_target_tree_count, + monthlyTargetType: data.monthly_target_type, + benefitActive: data.benefit_active, + benefitActivatedAt: data.benefit_activated_at ? new Date(data.benefit_activated_at) : null, + lastAssessmentMonth: data.last_assessment_month || null, + monthlyTreesAdded: data.monthly_trees_added, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + userId: data.user_id, + accountSequence: data.account_sequence, + roleType: data.role_type, + regionCode: data.region_code, + regionName: data.region_name, + status: data.status, + displayTitle: data.display_title, + authorizedAt: data.authorized_at ? new Date(data.authorized_at) : null, + authorizedBy: data.authorized_by || null, + revokedAt: data.revoked_at ? new Date(data.revoked_at) : null, + revokedBy: data.revoked_by || null, + deletedAt: data.deleted_at ? new Date(data.deleted_at) : null, + initialTargetTreeCount: data.initial_target_tree_count, + monthlyTargetType: data.monthly_target_type, + benefitActive: data.benefit_active, + benefitActivatedAt: data.benefit_activated_at ? new Date(data.benefit_activated_at) : null, + lastAssessmentMonth: data.last_assessment_month || null, + monthlyTreesAdded: data.monthly_trees_added, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Authorization-CDC] Created/Snapshot authorization role: ${data.id}`); + } + + private async handleAuthorizationRoleUpdate(data: CdcAuthorizationRolePayload): Promise { + const exists = await this.prisma.authorizationRoleQueryView.findUnique({ + where: { id: data.id }, + select: { id: true }, + }); + + if (!exists) { + await this.handleAuthorizationRoleCreateOrSnapshot(data); + return; + } + + await this.prisma.authorizationRoleQueryView.update({ + where: { id: data.id }, + data: { + status: data.status, + authorizedAt: data.authorized_at ? new Date(data.authorized_at) : null, + authorizedBy: data.authorized_by || null, + revokedAt: data.revoked_at ? new Date(data.revoked_at) : null, + revokedBy: data.revoked_by || null, + deletedAt: data.deleted_at ? new Date(data.deleted_at) : null, + benefitActive: data.benefit_active, + benefitActivatedAt: data.benefit_activated_at ? new Date(data.benefit_activated_at) : null, + lastAssessmentMonth: data.last_assessment_month || null, + monthlyTreesAdded: data.monthly_trees_added, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Authorization-CDC] Updated authorization role: ${data.id}`); + } + + private async handleAuthorizationRoleDelete(data: CdcAuthorizationRolePayload): Promise { + try { + await this.prisma.authorizationRoleQueryView.delete({ + where: { id: data.id }, + }); + this.logger.log(`[Authorization-CDC] Deleted authorization role: ${data.id}`); + } catch { + this.logger.warn(`[Authorization-CDC] Authorization role not found for delete: ${data.id}`); + } + } + + // ==================== monthly_assessments 处理 ==================== + + private async processMonthlyAssessmentEvent(data: CdcMonthlyAssessmentPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handleMonthlyAssessmentDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleMonthlyAssessmentCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handleMonthlyAssessmentUpdate(data); + } + } + + private async handleMonthlyAssessmentCreateOrSnapshot(data: CdcMonthlyAssessmentPayload): Promise { + await this.prisma.monthlyAssessmentQueryView.upsert({ + where: { id: data.id }, + create: { + id: data.id, + authorizationId: data.authorization_id, + userId: data.user_id, + accountSequence: data.account_sequence, + roleType: data.role_type, + regionCode: data.region_code, + assessmentMonth: data.assessment_month, + monthIndex: data.month_index, + monthlyTarget: data.monthly_target, + cumulativeTarget: data.cumulative_target, + monthlyCompleted: data.monthly_completed, + cumulativeCompleted: data.cumulative_completed, + completedAt: data.completed_at ? new Date(data.completed_at) : null, + result: data.result, + rankingInRegion: data.ranking_in_region || null, + isFirstPlace: data.is_first_place, + isBypassed: data.is_bypassed, + assessedAt: data.assessed_at ? new Date(data.assessed_at) : null, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + authorizationId: data.authorization_id, + userId: data.user_id, + accountSequence: data.account_sequence, + roleType: data.role_type, + regionCode: data.region_code, + assessmentMonth: data.assessment_month, + monthIndex: data.month_index, + monthlyTarget: data.monthly_target, + cumulativeTarget: data.cumulative_target, + monthlyCompleted: data.monthly_completed, + cumulativeCompleted: data.cumulative_completed, + completedAt: data.completed_at ? new Date(data.completed_at) : null, + result: data.result, + rankingInRegion: data.ranking_in_region || null, + isFirstPlace: data.is_first_place, + isBypassed: data.is_bypassed, + assessedAt: data.assessed_at ? new Date(data.assessed_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Authorization-CDC] Created/Snapshot monthly assessment: ${data.id}`); + } + + private async handleMonthlyAssessmentUpdate(data: CdcMonthlyAssessmentPayload): Promise { + const exists = await this.prisma.monthlyAssessmentQueryView.findUnique({ + where: { id: data.id }, + select: { id: true }, + }); + + if (!exists) { + await this.handleMonthlyAssessmentCreateOrSnapshot(data); + return; + } + + await this.prisma.monthlyAssessmentQueryView.update({ + where: { id: data.id }, + data: { + monthlyCompleted: data.monthly_completed, + cumulativeCompleted: data.cumulative_completed, + completedAt: data.completed_at ? new Date(data.completed_at) : null, + result: data.result, + rankingInRegion: data.ranking_in_region || null, + isFirstPlace: data.is_first_place, + isBypassed: data.is_bypassed, + assessedAt: data.assessed_at ? new Date(data.assessed_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Authorization-CDC] Updated monthly assessment: ${data.id}`); + } + + private async handleMonthlyAssessmentDelete(data: CdcMonthlyAssessmentPayload): Promise { + try { + await this.prisma.monthlyAssessmentQueryView.delete({ + where: { id: data.id }, + }); + this.logger.log(`[Authorization-CDC] Deleted monthly assessment: ${data.id}`); + } catch { + this.logger.warn(`[Authorization-CDC] Monthly assessment not found for delete: ${data.id}`); + } + } + + // ==================== system_accounts 处理 ==================== + + private async processSystemAccountEvent(data: CdcSystemAccountPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handleSystemAccountDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleSystemAccountCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handleSystemAccountUpdate(data); + } + } + + private async handleSystemAccountCreateOrSnapshot(data: CdcSystemAccountPayload): Promise { + await this.prisma.systemAccountQueryView.upsert({ + where: { id: BigInt(data.account_id) }, + create: { + id: BigInt(data.account_id), + accountType: data.account_type, + regionCode: data.region_code || null, + regionName: data.region_name || null, + walletAddress: data.wallet_address || null, + usdtBalance: new Decimal(data.usdt_balance || '0'), + hashpower: new Decimal(data.hashpower || '0'), + totalReceived: new Decimal(data.total_received || '0'), + totalTransferred: new Decimal(data.total_transferred || '0'), + status: data.status, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + accountType: data.account_type, + regionCode: data.region_code || null, + regionName: data.region_name || null, + walletAddress: data.wallet_address || null, + usdtBalance: new Decimal(data.usdt_balance || '0'), + hashpower: new Decimal(data.hashpower || '0'), + totalReceived: new Decimal(data.total_received || '0'), + totalTransferred: new Decimal(data.total_transferred || '0'), + status: data.status, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Authorization-CDC] Created/Snapshot system account: ${data.account_id} (${data.account_type})`); + } + + private async handleSystemAccountUpdate(data: CdcSystemAccountPayload): Promise { + const id = BigInt(data.account_id); + const exists = await this.prisma.systemAccountQueryView.findUnique({ + where: { id }, + select: { id: true }, + }); + + if (!exists) { + await this.handleSystemAccountCreateOrSnapshot(data); + return; + } + + await this.prisma.systemAccountQueryView.update({ + where: { id }, + data: { + walletAddress: data.wallet_address || null, + usdtBalance: new Decimal(data.usdt_balance || '0'), + hashpower: new Decimal(data.hashpower || '0'), + totalReceived: new Decimal(data.total_received || '0'), + totalTransferred: new Decimal(data.total_transferred || '0'), + status: data.status, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Authorization-CDC] Updated system account: ${data.account_id} (${data.account_type})`); + } + + private async handleSystemAccountDelete(data: CdcSystemAccountPayload): Promise { + const id = BigInt(data.account_id); + + try { + await this.prisma.systemAccountQueryView.delete({ + where: { id }, + }); + this.logger.log(`[Authorization-CDC] Deleted system account: ${data.account_id}`); + } catch { + this.logger.warn(`[Authorization-CDC] System account not found for delete: ${data.account_id}`); + } + } + + // ==================== system_account_ledgers 处理 ==================== + + private async processSystemAccountLedgerEvent(data: CdcSystemAccountLedgerPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + // 系统账户流水是 append-only,通常只有 create 和 snapshot + if (operation === 'd' || isDeleted) { + await this.handleSystemAccountLedgerDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleSystemAccountLedgerCreate(data); + } else if (operation === 'u') { + await this.handleSystemAccountLedgerUpdate(data); + } + } + + private async handleSystemAccountLedgerCreate(data: CdcSystemAccountLedgerPayload): Promise { + await this.prisma.systemAccountLedgerView.upsert({ + where: { id: BigInt(data.ledger_id) }, + create: { + id: BigInt(data.ledger_id), + accountId: BigInt(data.account_id), + entryType: data.entry_type, + amount: new Decimal(data.amount), + balanceAfter: new Decimal(data.balance_after), + sourceOrderId: data.source_order_id ? BigInt(data.source_order_id) : null, + sourceRewardId: data.source_reward_id ? BigInt(data.source_reward_id) : null, + txHash: data.tx_hash || null, + memo: data.memo || null, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + accountId: BigInt(data.account_id), + entryType: data.entry_type, + amount: new Decimal(data.amount), + balanceAfter: new Decimal(data.balance_after), + sourceOrderId: data.source_order_id ? BigInt(data.source_order_id) : null, + sourceRewardId: data.source_reward_id ? BigInt(data.source_reward_id) : null, + txHash: data.tx_hash || null, + memo: data.memo || null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Authorization-CDC] Created system account ledger: ${data.ledger_id} (${data.entry_type})`); + } + + private async handleSystemAccountLedgerUpdate(data: CdcSystemAccountLedgerPayload): Promise { + await this.handleSystemAccountLedgerCreate(data); + this.logger.log(`[Authorization-CDC] Updated system account ledger: ${data.ledger_id}`); + } + + private async handleSystemAccountLedgerDelete(data: CdcSystemAccountLedgerPayload): Promise { + const id = BigInt(data.ledger_id); + + try { + await this.prisma.systemAccountLedgerView.delete({ + where: { id }, + }); + this.logger.log(`[Authorization-CDC] Deleted system account ledger: ${data.ledger_id}`); + } catch { + this.logger.warn(`[Authorization-CDC] System account ledger not found for delete: ${data.ledger_id}`); + } + } + + // ==================== Helper Methods ==================== + + private async isEventProcessed(eventId: string): Promise { + const count = await this.prisma.processedEvent.count({ + where: { eventId }, + }); + return count > 0; + } + + private async markEventProcessed(eventId: string, eventType: string): Promise { + await this.prisma.processedEvent.create({ + data: { + eventId, + eventType, + processedAt: new Date(), + }, + }); + } + + /** + * 获取消费者状态 + */ + getStatus(): { isRunning: boolean; topics: string[]; consumerGroup: string } { + return { + isRunning: this.isRunning, + topics: this.cdcTopics, + consumerGroup: this.consumerGroup, + }; + } +} diff --git a/backend/services/admin-service/src/infrastructure/kafka/index.ts b/backend/services/admin-service/src/infrastructure/kafka/index.ts index c9cb8a5b..91164753 100644 --- a/backend/services/admin-service/src/infrastructure/kafka/index.ts +++ b/backend/services/admin-service/src/infrastructure/kafka/index.ts @@ -2,3 +2,6 @@ export * from './kafka.module'; export * from './user-event-consumer.service'; export * from './cdc-consumer.service'; export * from './referral-cdc-consumer.service'; +export * from './wallet-cdc-consumer.service'; +export * from './planting-cdc-consumer.service'; +export * from './authorization-cdc-consumer.service'; diff --git a/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts index f79d5c19..a04c985a 100644 --- a/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts @@ -6,6 +6,9 @@ import { USER_QUERY_REPOSITORY } from '../../domain/repositories/user-query.repo import { UserEventConsumerService } from './user-event-consumer.service'; import { CdcConsumerService } from './cdc-consumer.service'; import { ReferralCdcConsumerService } from './referral-cdc-consumer.service'; +import { WalletCdcConsumerService } from './wallet-cdc-consumer.service'; +import { PlantingCdcConsumerService } from './planting-cdc-consumer.service'; +import { AuthorizationCdcConsumerService } from './authorization-cdc-consumer.service'; /** * Kafka 模块 @@ -13,6 +16,9 @@ import { ReferralCdcConsumerService } from './referral-cdc-consumer.service'; * CDC 消费者 - 通过 Debezium 从各服务同步数据: * 1. CdcConsumerService - identity-service 用户数据 * 2. ReferralCdcConsumerService - referral-service 推荐关系 + * 3. WalletCdcConsumerService - wallet-service 钱包/提现数据 + * 4. PlantingCdcConsumerService - planting-service 认种/持仓/合同数据 + * 5. AuthorizationCdcConsumerService - authorization-service 授权/考核/系统账户数据 * * Outbox 消费者 - 处理业务领域事件: * 1. UserEventConsumerService - 用户相关业务事件 @@ -31,11 +37,20 @@ import { ReferralCdcConsumerService } from './referral-cdc-consumer.service'; CdcConsumerService, // CDC 消费者 - referral-service ReferralCdcConsumerService, + // CDC 消费者 - wallet-service + WalletCdcConsumerService, + // CDC 消费者 - planting-service + PlantingCdcConsumerService, + // CDC 消费者 - authorization-service + AuthorizationCdcConsumerService, ], exports: [ UserEventConsumerService, CdcConsumerService, ReferralCdcConsumerService, + WalletCdcConsumerService, + PlantingCdcConsumerService, + AuthorizationCdcConsumerService, USER_QUERY_REPOSITORY, ], }) diff --git a/backend/services/admin-service/src/infrastructure/kafka/planting-cdc-consumer.service.ts b/backend/services/admin-service/src/infrastructure/kafka/planting-cdc-consumer.service.ts new file mode 100644 index 00000000..35c74b92 --- /dev/null +++ b/backend/services/admin-service/src/infrastructure/kafka/planting-cdc-consumer.service.ts @@ -0,0 +1,600 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; +import { PrismaService } from '../persistence/prisma/prisma.service'; +import { Decimal } from '@prisma/client/runtime/library'; + +/** + * Debezium CDC 事件结构 (经过 ExtractNewRecordState 转换后) + */ + +// planting_orders 表 +interface CdcPlantingOrderPayload { + order_id: string; + order_no: string; + user_id: string; + account_sequence: string; + tree_count: number; + total_amount: string; + selected_province?: string | null; + selected_city?: string | null; + status: string; + pool_injection_batch_id?: string | null; + created_at: string; + paid_at?: string | null; + fund_allocated_at?: string | null; + mining_enabled_at?: string | null; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +// planting_positions 表 +interface CdcPlantingPositionPayload { + position_id: string; + user_id: string; + total_tree_count: number; + effective_tree_count: number; + pending_tree_count: number; + first_mining_start_at?: string | null; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +// fund_allocations 表 (资金分配分类账) +interface CdcFundAllocationPayload { + allocation_id: string; + order_id: string; + target_type: string; + amount: string; + target_account_id?: string | null; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +// contract_signing_tasks 表 +interface CdcContractSigningTaskPayload { + task_id: string; + order_no: string; + contract_no: string; + user_id: string; + account_sequence: string; + template_id: number; + contract_version: string; + tree_count: number; + total_amount: string; + province_code: string; + province_name: string; + city_code: string; + city_name: string; + status: string; + expires_at: string; + signed_at?: string | null; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +type CdcPlantingPayload = CdcPlantingOrderPayload | CdcPlantingPositionPayload | CdcFundAllocationPayload | CdcContractSigningTaskPayload; + +/** + * Planting CDC 消费者服务 + * + * 消费 Debezium 从 planting-service PostgreSQL 捕获的数据变更 + * + * Topics: + * - cdc.planting.public.planting_orders + * - cdc.planting.public.planting_positions + * - cdc.planting.public.contract_signing_tasks + */ +@Injectable() +export class PlantingCdcConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(PlantingCdcConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isRunning = false; + + // CDC Topics + private readonly cdcTopics = [ + 'cdc.planting.public.planting_orders', + 'cdc.planting.public.planting_positions', + 'cdc.planting.public.contract_signing_tasks', + 'cdc.planting.public.fund_allocations', + ]; + private readonly consumerGroup: string; + + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, + ) { + const brokers = (this.configService.get('KAFKA_BROKERS', 'localhost:9092')).split(','); + const clientId = this.configService.get('KAFKA_CLIENT_ID', 'admin-service'); + this.consumerGroup = this.configService.get('KAFKA_PLANTING_CDC_GROUP', 'admin-service-planting-cdc'); + + this.kafka = new Kafka({ + clientId: `${clientId}-planting-cdc`, + brokers, + logLevel: logLevel.WARN, + }); + + this.consumer = this.kafka.consumer({ groupId: this.consumerGroup }); + + this.logger.log(`[Planting-CDC] Configured to consume topics: ${this.cdcTopics.join(', ')}`); + } + + async onModuleInit() { + await this.start(); + } + + async onModuleDestroy() { + await this.stop(); + } + + async start(): Promise { + if (this.isRunning) { + this.logger.warn('[Planting-CDC] Consumer already running'); + return; + } + + try { + this.logger.log('[Planting-CDC] Connecting to Kafka...'); + await this.consumer.connect(); + + for (const topic of this.cdcTopics) { + await this.consumer.subscribe({ + topic, + fromBeginning: false, + }); + } + + this.logger.log(`[Planting-CDC] Subscribed to topics: ${this.cdcTopics.join(', ')}`); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.isRunning = true; + this.logger.log('[Planting-CDC] Consumer started successfully'); + } catch (error) { + this.logger.error('[Planting-CDC] Failed to start consumer:', error); + } + } + + async stop(): Promise { + if (!this.isRunning) return; + + try { + await this.consumer.disconnect(); + this.isRunning = false; + this.logger.log('[Planting-CDC] Consumer stopped'); + } catch (error) { + this.logger.error('[Planting-CDC] Failed to stop consumer:', error); + } + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { topic, partition, message } = payload; + + if (!message.value) { + this.logger.warn(`[Planting-CDC] Empty message from ${topic}:${partition}`); + return; + } + + try { + const data = JSON.parse(message.value.toString()) as CdcPlantingPayload; + const operation = data.__op; + const table = data.__table; + + this.logger.debug( + `[Planting-CDC] Received ${operation} event for table ${table}` + ); + + // 幂等性检查 + const eventId = `planting-cdc:${topic}:${partition}:${message.offset}`; + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`[Planting-CDC] Event ${eventId} already processed, skipping`); + return; + } + + // 根据表名处理不同的事件 + switch (table) { + case 'planting_orders': + await this.processPlantingOrderEvent(data as CdcPlantingOrderPayload); + break; + case 'planting_positions': + await this.processPlantingPositionEvent(data as CdcPlantingPositionPayload); + break; + case 'contract_signing_tasks': + await this.processContractSigningTaskEvent(data as CdcContractSigningTaskPayload); + break; + case 'fund_allocations': + await this.processFundAllocationEvent(data as CdcFundAllocationPayload); + break; + default: + this.logger.warn(`[Planting-CDC] Unknown table: ${table}`); + } + + // 记录已处理 + await this.markEventProcessed(eventId, `planting-cdc:${table}:${operation}`); + + this.logger.log( + `[Planting-CDC] ✓ Processed ${operation} for table: ${table}` + ); + } catch (error) { + this.logger.error(`[Planting-CDC] Failed to process message:`, error); + throw error; // 让 KafkaJS 重试 + } + } + + // ==================== planting_orders 处理 ==================== + + private async processPlantingOrderEvent(data: CdcPlantingOrderPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handlePlantingOrderDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handlePlantingOrderCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handlePlantingOrderUpdate(data); + } + } + + private async handlePlantingOrderCreateOrSnapshot(data: CdcPlantingOrderPayload): Promise { + await this.prisma.plantingOrderQueryView.upsert({ + where: { id: BigInt(data.order_id) }, + create: { + id: BigInt(data.order_id), + orderNo: data.order_no, + userId: BigInt(data.user_id), + accountSequence: data.account_sequence, + treeCount: data.tree_count, + totalAmount: new Decimal(data.total_amount), + selectedProvince: data.selected_province || null, + selectedCity: data.selected_city || null, + status: data.status, + poolInjectionBatchId: data.pool_injection_batch_id ? BigInt(data.pool_injection_batch_id) : null, + createdAt: new Date(data.created_at), + paidAt: data.paid_at ? new Date(data.paid_at) : null, + fundAllocatedAt: data.fund_allocated_at ? new Date(data.fund_allocated_at) : null, + miningEnabledAt: data.mining_enabled_at ? new Date(data.mining_enabled_at) : null, + syncedAt: new Date(), + }, + update: { + orderNo: data.order_no, + userId: BigInt(data.user_id), + accountSequence: data.account_sequence, + treeCount: data.tree_count, + totalAmount: new Decimal(data.total_amount), + selectedProvince: data.selected_province || null, + selectedCity: data.selected_city || null, + status: data.status, + poolInjectionBatchId: data.pool_injection_batch_id ? BigInt(data.pool_injection_batch_id) : null, + paidAt: data.paid_at ? new Date(data.paid_at) : null, + fundAllocatedAt: data.fund_allocated_at ? new Date(data.fund_allocated_at) : null, + miningEnabledAt: data.mining_enabled_at ? new Date(data.mining_enabled_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Planting-CDC] Created/Snapshot planting order: ${data.order_no}`); + } + + private async handlePlantingOrderUpdate(data: CdcPlantingOrderPayload): Promise { + const id = BigInt(data.order_id); + const exists = await this.prisma.plantingOrderQueryView.findUnique({ + where: { id }, + select: { id: true }, + }); + + if (!exists) { + await this.handlePlantingOrderCreateOrSnapshot(data); + return; + } + + await this.prisma.plantingOrderQueryView.update({ + where: { id }, + data: { + selectedProvince: data.selected_province || null, + selectedCity: data.selected_city || null, + status: data.status, + poolInjectionBatchId: data.pool_injection_batch_id ? BigInt(data.pool_injection_batch_id) : null, + paidAt: data.paid_at ? new Date(data.paid_at) : null, + fundAllocatedAt: data.fund_allocated_at ? new Date(data.fund_allocated_at) : null, + miningEnabledAt: data.mining_enabled_at ? new Date(data.mining_enabled_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Planting-CDC] Updated planting order: ${data.order_no}`); + } + + private async handlePlantingOrderDelete(data: CdcPlantingOrderPayload): Promise { + const id = BigInt(data.order_id); + + try { + await this.prisma.plantingOrderQueryView.delete({ + where: { id }, + }); + this.logger.log(`[Planting-CDC] Deleted planting order: ${data.order_no}`); + } catch { + this.logger.warn(`[Planting-CDC] Planting order not found for delete: ${data.order_no}`); + } + } + + // ==================== planting_positions 处理 ==================== + + private async processPlantingPositionEvent(data: CdcPlantingPositionPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handlePlantingPositionDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handlePlantingPositionCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handlePlantingPositionUpdate(data); + } + } + + private async handlePlantingPositionCreateOrSnapshot(data: CdcPlantingPositionPayload): Promise { + await this.prisma.plantingPositionQueryView.upsert({ + where: { id: BigInt(data.position_id) }, + create: { + id: BigInt(data.position_id), + userId: BigInt(data.user_id), + totalTreeCount: data.total_tree_count, + effectiveTreeCount: data.effective_tree_count, + pendingTreeCount: data.pending_tree_count, + firstMiningStartAt: data.first_mining_start_at ? new Date(data.first_mining_start_at) : null, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + userId: BigInt(data.user_id), + totalTreeCount: data.total_tree_count, + effectiveTreeCount: data.effective_tree_count, + pendingTreeCount: data.pending_tree_count, + firstMiningStartAt: data.first_mining_start_at ? new Date(data.first_mining_start_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Planting-CDC] Created/Snapshot planting position for user: ${data.user_id}`); + } + + private async handlePlantingPositionUpdate(data: CdcPlantingPositionPayload): Promise { + const id = BigInt(data.position_id); + const exists = await this.prisma.plantingPositionQueryView.findUnique({ + where: { id }, + select: { id: true }, + }); + + if (!exists) { + await this.handlePlantingPositionCreateOrSnapshot(data); + return; + } + + await this.prisma.plantingPositionQueryView.update({ + where: { id }, + data: { + totalTreeCount: data.total_tree_count, + effectiveTreeCount: data.effective_tree_count, + pendingTreeCount: data.pending_tree_count, + firstMiningStartAt: data.first_mining_start_at ? new Date(data.first_mining_start_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Planting-CDC] Updated planting position for user: ${data.user_id}`); + } + + private async handlePlantingPositionDelete(data: CdcPlantingPositionPayload): Promise { + const id = BigInt(data.position_id); + + try { + await this.prisma.plantingPositionQueryView.delete({ + where: { id }, + }); + this.logger.log(`[Planting-CDC] Deleted planting position for user: ${data.user_id}`); + } catch { + this.logger.warn(`[Planting-CDC] Planting position not found for delete: ${data.user_id}`); + } + } + + // ==================== contract_signing_tasks 处理 ==================== + + private async processContractSigningTaskEvent(data: CdcContractSigningTaskPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handleContractSigningTaskDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleContractSigningTaskCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handleContractSigningTaskUpdate(data); + } + } + + private async handleContractSigningTaskCreateOrSnapshot(data: CdcContractSigningTaskPayload): Promise { + await this.prisma.contractSigningTaskQueryView.upsert({ + where: { id: BigInt(data.task_id) }, + create: { + id: BigInt(data.task_id), + orderNo: data.order_no, + contractNo: data.contract_no, + userId: BigInt(data.user_id), + accountSequence: data.account_sequence, + contractVersion: data.contract_version, + treeCount: data.tree_count, + totalAmount: new Decimal(data.total_amount), + provinceCode: data.province_code, + provinceName: data.province_name, + cityCode: data.city_code, + cityName: data.city_name, + status: data.status, + expiresAt: new Date(data.expires_at), + signedAt: data.signed_at ? new Date(data.signed_at) : null, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + orderNo: data.order_no, + contractNo: data.contract_no, + userId: BigInt(data.user_id), + accountSequence: data.account_sequence, + contractVersion: data.contract_version, + treeCount: data.tree_count, + totalAmount: new Decimal(data.total_amount), + provinceCode: data.province_code, + provinceName: data.province_name, + cityCode: data.city_code, + cityName: data.city_name, + status: data.status, + expiresAt: new Date(data.expires_at), + signedAt: data.signed_at ? new Date(data.signed_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Planting-CDC] Created/Snapshot contract signing task: ${data.contract_no}`); + } + + private async handleContractSigningTaskUpdate(data: CdcContractSigningTaskPayload): Promise { + const id = BigInt(data.task_id); + const exists = await this.prisma.contractSigningTaskQueryView.findUnique({ + where: { id }, + select: { id: true }, + }); + + if (!exists) { + await this.handleContractSigningTaskCreateOrSnapshot(data); + return; + } + + await this.prisma.contractSigningTaskQueryView.update({ + where: { id }, + data: { + status: data.status, + signedAt: data.signed_at ? new Date(data.signed_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Planting-CDC] Updated contract signing task: ${data.contract_no}`); + } + + private async handleContractSigningTaskDelete(data: CdcContractSigningTaskPayload): Promise { + const id = BigInt(data.task_id); + + try { + await this.prisma.contractSigningTaskQueryView.delete({ + where: { id }, + }); + this.logger.log(`[Planting-CDC] Deleted contract signing task: ${data.contract_no}`); + } catch { + this.logger.warn(`[Planting-CDC] Contract signing task not found for delete: ${data.contract_no}`); + } + } + + // ==================== fund_allocations 处理 ==================== + + private async processFundAllocationEvent(data: CdcFundAllocationPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + // 资金分配是 append-only,通常只有 create 和 snapshot + if (operation === 'd' || isDeleted) { + await this.handleFundAllocationDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleFundAllocationCreate(data); + } else if (operation === 'u') { + await this.handleFundAllocationUpdate(data); + } + } + + private async handleFundAllocationCreate(data: CdcFundAllocationPayload): Promise { + await this.prisma.fundAllocationView.upsert({ + where: { id: BigInt(data.allocation_id) }, + create: { + id: BigInt(data.allocation_id), + orderId: BigInt(data.order_id), + targetType: data.target_type, + amount: new Decimal(data.amount), + targetAccountId: data.target_account_id || null, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + orderId: BigInt(data.order_id), + targetType: data.target_type, + amount: new Decimal(data.amount), + targetAccountId: data.target_account_id || null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Planting-CDC] Created fund allocation: ${data.allocation_id} (${data.target_type})`); + } + + private async handleFundAllocationUpdate(data: CdcFundAllocationPayload): Promise { + await this.handleFundAllocationCreate(data); + this.logger.log(`[Planting-CDC] Updated fund allocation: ${data.allocation_id}`); + } + + private async handleFundAllocationDelete(data: CdcFundAllocationPayload): Promise { + const id = BigInt(data.allocation_id); + + try { + await this.prisma.fundAllocationView.delete({ + where: { id }, + }); + this.logger.log(`[Planting-CDC] Deleted fund allocation: ${data.allocation_id}`); + } catch { + this.logger.warn(`[Planting-CDC] Fund allocation not found for delete: ${data.allocation_id}`); + } + } + + // ==================== Helper Methods ==================== + + private async isEventProcessed(eventId: string): Promise { + const count = await this.prisma.processedEvent.count({ + where: { eventId }, + }); + return count > 0; + } + + private async markEventProcessed(eventId: string, eventType: string): Promise { + await this.prisma.processedEvent.create({ + data: { + eventId, + eventType, + processedAt: new Date(), + }, + }); + } + + /** + * 获取消费者状态 + */ + getStatus(): { isRunning: boolean; topics: string[]; consumerGroup: string } { + return { + isRunning: this.isRunning, + topics: this.cdcTopics, + consumerGroup: this.consumerGroup, + }; + } +} diff --git a/backend/services/admin-service/src/infrastructure/kafka/wallet-cdc-consumer.service.ts b/backend/services/admin-service/src/infrastructure/kafka/wallet-cdc-consumer.service.ts new file mode 100644 index 00000000..e2180f97 --- /dev/null +++ b/backend/services/admin-service/src/infrastructure/kafka/wallet-cdc-consumer.service.ts @@ -0,0 +1,738 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; +import { PrismaService } from '../persistence/prisma/prisma.service'; +import { Decimal } from '@prisma/client/runtime/library'; + +/** + * Debezium CDC 事件结构 (经过 ExtractNewRecordState 转换后) + */ + +// wallet_accounts 表 +interface CdcWalletAccountPayload { + wallet_id: string; + account_sequence: string; + user_id: string; + usdt_available: string; + usdt_frozen: string; + dst_available: string; + dst_frozen: string; + bnb_available: string; + bnb_frozen: string; + og_available: string; + og_frozen: string; + rwad_available: string; + rwad_frozen: string; + hashpower: string; + pending_usdt: string; + pending_hashpower: string; + settleable_usdt: string; + settleable_hashpower: string; + settled_total_usdt: string; + settled_total_hashpower: string; + expired_total_usdt: string; + expired_total_hashpower: string; + status: string; + has_planted: boolean; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +// withdrawal_orders 表 +interface CdcWithdrawalOrderPayload { + order_id: string; + order_no: string; + account_sequence: string; + user_id: string; + amount: string; + fee: string; + chain_type: string; + to_address: string; + tx_hash?: string | null; + is_internal_transfer: boolean; + to_account_sequence?: string | null; + to_user_id?: string | null; + status: string; + error_message?: string | null; + frozen_at?: string | null; + broadcasted_at?: string | null; + confirmed_at?: string | null; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +// fiat_withdrawal_orders 表 +interface CdcFiatWithdrawalOrderPayload { + order_id: string; + order_no: string; + account_sequence: string; + user_id: string; + amount: string; + fee: string; + payment_method: string; + bank_name?: string | null; + bank_card_no?: string | null; + card_holder_name?: string | null; + alipay_account?: string | null; + alipay_real_name?: string | null; + wechat_account?: string | null; + wechat_real_name?: string | null; + status: string; + error_message?: string | null; + reviewed_by?: string | null; + reviewed_at?: string | null; + review_remark?: string | null; + paid_by?: string | null; + paid_at?: string | null; + frozen_at?: string | null; + completed_at?: string | null; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +// wallet_ledger_entries 表 (分类账流水) +interface CdcWalletLedgerEntryPayload { + entry_id: string; + account_sequence: string; + user_id: string; + entry_type: string; + amount: string; + asset_type: string; + balance_after?: string | null; + ref_order_id?: string | null; + ref_tx_hash?: string | null; + memo?: string | null; + created_at: string; + __op: 'c' | 'u' | 'd' | 'r'; + __table: string; + __source_ts_ms: number; + __deleted?: string; +} + +type CdcWalletPayload = CdcWalletAccountPayload | CdcWithdrawalOrderPayload | CdcFiatWithdrawalOrderPayload | CdcWalletLedgerEntryPayload; + +/** + * Wallet CDC 消费者服务 + * + * 消费 Debezium 从 wallet-service PostgreSQL 捕获的数据变更 + * + * Topics: + * - cdc.wallet.public.wallet_accounts + * - cdc.wallet.public.withdrawal_orders + * - cdc.wallet.public.fiat_withdrawal_orders + */ +@Injectable() +export class WalletCdcConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(WalletCdcConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isRunning = false; + + // CDC Topics + private readonly cdcTopics = [ + 'cdc.wallet.public.wallet_accounts', + 'cdc.wallet.public.withdrawal_orders', + 'cdc.wallet.public.fiat_withdrawal_orders', + 'cdc.wallet.public.wallet_ledger_entries', + ]; + private readonly consumerGroup: string; + + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, + ) { + const brokers = (this.configService.get('KAFKA_BROKERS', 'localhost:9092')).split(','); + const clientId = this.configService.get('KAFKA_CLIENT_ID', 'admin-service'); + this.consumerGroup = this.configService.get('KAFKA_WALLET_CDC_GROUP', 'admin-service-wallet-cdc'); + + this.kafka = new Kafka({ + clientId: `${clientId}-wallet-cdc`, + brokers, + logLevel: logLevel.WARN, + }); + + this.consumer = this.kafka.consumer({ groupId: this.consumerGroup }); + + this.logger.log(`[Wallet-CDC] Configured to consume topics: ${this.cdcTopics.join(', ')}`); + } + + async onModuleInit() { + await this.start(); + } + + async onModuleDestroy() { + await this.stop(); + } + + async start(): Promise { + if (this.isRunning) { + this.logger.warn('[Wallet-CDC] Consumer already running'); + return; + } + + try { + this.logger.log('[Wallet-CDC] Connecting to Kafka...'); + await this.consumer.connect(); + + for (const topic of this.cdcTopics) { + await this.consumer.subscribe({ + topic, + fromBeginning: false, + }); + } + + this.logger.log(`[Wallet-CDC] Subscribed to topics: ${this.cdcTopics.join(', ')}`); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.isRunning = true; + this.logger.log('[Wallet-CDC] Consumer started successfully'); + } catch (error) { + this.logger.error('[Wallet-CDC] Failed to start consumer:', error); + } + } + + async stop(): Promise { + if (!this.isRunning) return; + + try { + await this.consumer.disconnect(); + this.isRunning = false; + this.logger.log('[Wallet-CDC] Consumer stopped'); + } catch (error) { + this.logger.error('[Wallet-CDC] Failed to stop consumer:', error); + } + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { topic, partition, message } = payload; + + if (!message.value) { + this.logger.warn(`[Wallet-CDC] Empty message from ${topic}:${partition}`); + return; + } + + try { + const data = JSON.parse(message.value.toString()) as CdcWalletPayload; + const operation = data.__op; + const table = data.__table; + + this.logger.debug( + `[Wallet-CDC] Received ${operation} event for table ${table}` + ); + + // 幂等性检查 + const eventId = `wallet-cdc:${topic}:${partition}:${message.offset}`; + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`[Wallet-CDC] Event ${eventId} already processed, skipping`); + return; + } + + // 根据表名处理不同的事件 + switch (table) { + case 'wallet_accounts': + await this.processWalletAccountEvent(data as CdcWalletAccountPayload); + break; + case 'withdrawal_orders': + await this.processWithdrawalOrderEvent(data as CdcWithdrawalOrderPayload); + break; + case 'fiat_withdrawal_orders': + await this.processFiatWithdrawalOrderEvent(data as CdcFiatWithdrawalOrderPayload); + break; + case 'wallet_ledger_entries': + await this.processWalletLedgerEntryEvent(data as CdcWalletLedgerEntryPayload); + break; + default: + this.logger.warn(`[Wallet-CDC] Unknown table: ${table}`); + } + + // 记录已处理 + await this.markEventProcessed(eventId, `wallet-cdc:${table}:${operation}`); + + this.logger.log( + `[Wallet-CDC] ✓ Processed ${operation} for table: ${table}` + ); + } catch (error) { + this.logger.error(`[Wallet-CDC] Failed to process message:`, error); + throw error; // 让 KafkaJS 重试 + } + } + + // ==================== wallet_accounts 处理 ==================== + + private async processWalletAccountEvent(data: CdcWalletAccountPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handleWalletAccountDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleWalletAccountCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handleWalletAccountUpdate(data); + } + } + + private async handleWalletAccountCreateOrSnapshot(data: CdcWalletAccountPayload): Promise { + await this.prisma.walletAccountQueryView.upsert({ + where: { id: BigInt(data.wallet_id) }, + create: { + id: BigInt(data.wallet_id), + accountSequence: data.account_sequence, + userId: BigInt(data.user_id), + usdtAvailable: new Decimal(data.usdt_available || '0'), + usdtFrozen: new Decimal(data.usdt_frozen || '0'), + dstAvailable: new Decimal(data.dst_available || '0'), + dstFrozen: new Decimal(data.dst_frozen || '0'), + bnbAvailable: new Decimal(data.bnb_available || '0'), + bnbFrozen: new Decimal(data.bnb_frozen || '0'), + ogAvailable: new Decimal(data.og_available || '0'), + ogFrozen: new Decimal(data.og_frozen || '0'), + rwadAvailable: new Decimal(data.rwad_available || '0'), + rwadFrozen: new Decimal(data.rwad_frozen || '0'), + hashpower: new Decimal(data.hashpower || '0'), + pendingUsdt: new Decimal(data.pending_usdt || '0'), + pendingHashpower: new Decimal(data.pending_hashpower || '0'), + settleableUsdt: new Decimal(data.settleable_usdt || '0'), + settleableHashpower: new Decimal(data.settleable_hashpower || '0'), + settledTotalUsdt: new Decimal(data.settled_total_usdt || '0'), + settledTotalHashpower: new Decimal(data.settled_total_hashpower || '0'), + expiredTotalUsdt: new Decimal(data.expired_total_usdt || '0'), + expiredTotalHashpower: new Decimal(data.expired_total_hashpower || '0'), + status: data.status, + hasPlanted: data.has_planted, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + accountSequence: data.account_sequence, + userId: BigInt(data.user_id), + usdtAvailable: new Decimal(data.usdt_available || '0'), + usdtFrozen: new Decimal(data.usdt_frozen || '0'), + dstAvailable: new Decimal(data.dst_available || '0'), + dstFrozen: new Decimal(data.dst_frozen || '0'), + bnbAvailable: new Decimal(data.bnb_available || '0'), + bnbFrozen: new Decimal(data.bnb_frozen || '0'), + ogAvailable: new Decimal(data.og_available || '0'), + ogFrozen: new Decimal(data.og_frozen || '0'), + rwadAvailable: new Decimal(data.rwad_available || '0'), + rwadFrozen: new Decimal(data.rwad_frozen || '0'), + hashpower: new Decimal(data.hashpower || '0'), + pendingUsdt: new Decimal(data.pending_usdt || '0'), + pendingHashpower: new Decimal(data.pending_hashpower || '0'), + settleableUsdt: new Decimal(data.settleable_usdt || '0'), + settleableHashpower: new Decimal(data.settleable_hashpower || '0'), + settledTotalUsdt: new Decimal(data.settled_total_usdt || '0'), + settledTotalHashpower: new Decimal(data.settled_total_hashpower || '0'), + expiredTotalUsdt: new Decimal(data.expired_total_usdt || '0'), + expiredTotalHashpower: new Decimal(data.expired_total_hashpower || '0'), + status: data.status, + hasPlanted: data.has_planted, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Wallet-CDC] Created/Snapshot wallet account: ${data.account_sequence}`); + } + + private async handleWalletAccountUpdate(data: CdcWalletAccountPayload): Promise { + const id = BigInt(data.wallet_id); + const exists = await this.prisma.walletAccountQueryView.findUnique({ + where: { id }, + select: { id: true }, + }); + + if (!exists) { + await this.handleWalletAccountCreateOrSnapshot(data); + return; + } + + await this.prisma.walletAccountQueryView.update({ + where: { id }, + data: { + usdtAvailable: new Decimal(data.usdt_available || '0'), + usdtFrozen: new Decimal(data.usdt_frozen || '0'), + dstAvailable: new Decimal(data.dst_available || '0'), + dstFrozen: new Decimal(data.dst_frozen || '0'), + bnbAvailable: new Decimal(data.bnb_available || '0'), + bnbFrozen: new Decimal(data.bnb_frozen || '0'), + ogAvailable: new Decimal(data.og_available || '0'), + ogFrozen: new Decimal(data.og_frozen || '0'), + rwadAvailable: new Decimal(data.rwad_available || '0'), + rwadFrozen: new Decimal(data.rwad_frozen || '0'), + hashpower: new Decimal(data.hashpower || '0'), + pendingUsdt: new Decimal(data.pending_usdt || '0'), + pendingHashpower: new Decimal(data.pending_hashpower || '0'), + settleableUsdt: new Decimal(data.settleable_usdt || '0'), + settleableHashpower: new Decimal(data.settleable_hashpower || '0'), + settledTotalUsdt: new Decimal(data.settled_total_usdt || '0'), + settledTotalHashpower: new Decimal(data.settled_total_hashpower || '0'), + expiredTotalUsdt: new Decimal(data.expired_total_usdt || '0'), + expiredTotalHashpower: new Decimal(data.expired_total_hashpower || '0'), + status: data.status, + hasPlanted: data.has_planted, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Wallet-CDC] Updated wallet account: ${data.account_sequence}`); + } + + private async handleWalletAccountDelete(data: CdcWalletAccountPayload): Promise { + const id = BigInt(data.wallet_id); + + try { + await this.prisma.walletAccountQueryView.delete({ + where: { id }, + }); + this.logger.log(`[Wallet-CDC] Deleted wallet account: ${data.account_sequence}`); + } catch { + this.logger.warn(`[Wallet-CDC] Wallet account not found for delete: ${data.account_sequence}`); + } + } + + // ==================== withdrawal_orders 处理 ==================== + + private async processWithdrawalOrderEvent(data: CdcWithdrawalOrderPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handleWithdrawalOrderDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleWithdrawalOrderCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handleWithdrawalOrderUpdate(data); + } + } + + private async handleWithdrawalOrderCreateOrSnapshot(data: CdcWithdrawalOrderPayload): Promise { + await this.prisma.withdrawalOrderQueryView.upsert({ + where: { id: BigInt(data.order_id) }, + create: { + id: BigInt(data.order_id), + orderNo: data.order_no, + accountSequence: data.account_sequence, + userId: BigInt(data.user_id), + amount: new Decimal(data.amount), + fee: new Decimal(data.fee), + chainType: data.chain_type, + toAddress: data.to_address, + txHash: data.tx_hash || null, + isInternalTransfer: data.is_internal_transfer, + toAccountSequence: data.to_account_sequence || null, + toUserId: data.to_user_id ? BigInt(data.to_user_id) : null, + status: data.status, + errorMessage: data.error_message || null, + frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, + broadcastedAt: data.broadcasted_at ? new Date(data.broadcasted_at) : null, + confirmedAt: data.confirmed_at ? new Date(data.confirmed_at) : null, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + orderNo: data.order_no, + accountSequence: data.account_sequence, + userId: BigInt(data.user_id), + amount: new Decimal(data.amount), + fee: new Decimal(data.fee), + chainType: data.chain_type, + toAddress: data.to_address, + txHash: data.tx_hash || null, + isInternalTransfer: data.is_internal_transfer, + toAccountSequence: data.to_account_sequence || null, + toUserId: data.to_user_id ? BigInt(data.to_user_id) : null, + status: data.status, + errorMessage: data.error_message || null, + frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, + broadcastedAt: data.broadcasted_at ? new Date(data.broadcasted_at) : null, + confirmedAt: data.confirmed_at ? new Date(data.confirmed_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Wallet-CDC] Created/Snapshot withdrawal order: ${data.order_no}`); + } + + private async handleWithdrawalOrderUpdate(data: CdcWithdrawalOrderPayload): Promise { + const id = BigInt(data.order_id); + const exists = await this.prisma.withdrawalOrderQueryView.findUnique({ + where: { id }, + select: { id: true }, + }); + + if (!exists) { + await this.handleWithdrawalOrderCreateOrSnapshot(data); + return; + } + + await this.prisma.withdrawalOrderQueryView.update({ + where: { id }, + data: { + txHash: data.tx_hash || null, + status: data.status, + errorMessage: data.error_message || null, + frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, + broadcastedAt: data.broadcasted_at ? new Date(data.broadcasted_at) : null, + confirmedAt: data.confirmed_at ? new Date(data.confirmed_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Wallet-CDC] Updated withdrawal order: ${data.order_no}`); + } + + private async handleWithdrawalOrderDelete(data: CdcWithdrawalOrderPayload): Promise { + const id = BigInt(data.order_id); + + try { + await this.prisma.withdrawalOrderQueryView.delete({ + where: { id }, + }); + this.logger.log(`[Wallet-CDC] Deleted withdrawal order: ${data.order_no}`); + } catch { + this.logger.warn(`[Wallet-CDC] Withdrawal order not found for delete: ${data.order_no}`); + } + } + + // ==================== fiat_withdrawal_orders 处理 ==================== + + private async processFiatWithdrawalOrderEvent(data: CdcFiatWithdrawalOrderPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handleFiatWithdrawalOrderDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleFiatWithdrawalOrderCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handleFiatWithdrawalOrderUpdate(data); + } + } + + private maskBankCardNo(cardNo: string | null | undefined): string | null { + if (!cardNo) return null; + if (cardNo.length <= 8) return cardNo; + return cardNo.slice(0, 4) + '****' + cardNo.slice(-4); + } + + private maskAccount(account: string | null | undefined): string | null { + if (!account) return null; + if (account.length <= 4) return account; + const visible = Math.min(3, Math.floor(account.length / 3)); + return account.slice(0, visible) + '****' + account.slice(-visible); + } + + private async handleFiatWithdrawalOrderCreateOrSnapshot(data: CdcFiatWithdrawalOrderPayload): Promise { + await this.prisma.fiatWithdrawalOrderQueryView.upsert({ + where: { id: BigInt(data.order_id) }, + create: { + id: BigInt(data.order_id), + orderNo: data.order_no, + accountSequence: data.account_sequence, + userId: BigInt(data.user_id), + amount: new Decimal(data.amount), + fee: new Decimal(data.fee), + paymentMethod: data.payment_method, + bankName: data.bank_name || null, + bankCardNoMasked: this.maskBankCardNo(data.bank_card_no), + cardHolderName: data.card_holder_name || null, + alipayAccountMasked: this.maskAccount(data.alipay_account), + wechatAccountMasked: this.maskAccount(data.wechat_account), + status: data.status, + errorMessage: data.error_message || null, + reviewedBy: data.reviewed_by || null, + reviewedAt: data.reviewed_at ? new Date(data.reviewed_at) : null, + reviewRemark: data.review_remark || null, + paidBy: data.paid_by || null, + paidAt: data.paid_at ? new Date(data.paid_at) : null, + frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, + completedAt: data.completed_at ? new Date(data.completed_at) : null, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + orderNo: data.order_no, + accountSequence: data.account_sequence, + userId: BigInt(data.user_id), + amount: new Decimal(data.amount), + fee: new Decimal(data.fee), + paymentMethod: data.payment_method, + bankName: data.bank_name || null, + bankCardNoMasked: this.maskBankCardNo(data.bank_card_no), + cardHolderName: data.card_holder_name || null, + alipayAccountMasked: this.maskAccount(data.alipay_account), + wechatAccountMasked: this.maskAccount(data.wechat_account), + status: data.status, + errorMessage: data.error_message || null, + reviewedBy: data.reviewed_by || null, + reviewedAt: data.reviewed_at ? new Date(data.reviewed_at) : null, + reviewRemark: data.review_remark || null, + paidBy: data.paid_by || null, + paidAt: data.paid_at ? new Date(data.paid_at) : null, + frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, + completedAt: data.completed_at ? new Date(data.completed_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Wallet-CDC] Created/Snapshot fiat withdrawal order: ${data.order_no}`); + } + + private async handleFiatWithdrawalOrderUpdate(data: CdcFiatWithdrawalOrderPayload): Promise { + const id = BigInt(data.order_id); + const exists = await this.prisma.fiatWithdrawalOrderQueryView.findUnique({ + where: { id }, + select: { id: true }, + }); + + if (!exists) { + await this.handleFiatWithdrawalOrderCreateOrSnapshot(data); + return; + } + + await this.prisma.fiatWithdrawalOrderQueryView.update({ + where: { id }, + data: { + status: data.status, + errorMessage: data.error_message || null, + reviewedBy: data.reviewed_by || null, + reviewedAt: data.reviewed_at ? new Date(data.reviewed_at) : null, + reviewRemark: data.review_remark || null, + paidBy: data.paid_by || null, + paidAt: data.paid_at ? new Date(data.paid_at) : null, + frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, + completedAt: data.completed_at ? new Date(data.completed_at) : null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Wallet-CDC] Updated fiat withdrawal order: ${data.order_no}`); + } + + private async handleFiatWithdrawalOrderDelete(data: CdcFiatWithdrawalOrderPayload): Promise { + const id = BigInt(data.order_id); + + try { + await this.prisma.fiatWithdrawalOrderQueryView.delete({ + where: { id }, + }); + this.logger.log(`[Wallet-CDC] Deleted fiat withdrawal order: ${data.order_no}`); + } catch { + this.logger.warn(`[Wallet-CDC] Fiat withdrawal order not found for delete: ${data.order_no}`); + } + } + + // ==================== wallet_ledger_entries 处理 ==================== + + private async processWalletLedgerEntryEvent(data: CdcWalletLedgerEntryPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + // 分类账流水是 append-only,通常只有 create 和 snapshot + // 但也处理更新和删除情况以保持完整性 + if (operation === 'd' || isDeleted) { + await this.handleWalletLedgerEntryDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleWalletLedgerEntryCreate(data); + } else if (operation === 'u') { + await this.handleWalletLedgerEntryUpdate(data); + } + } + + private async handleWalletLedgerEntryCreate(data: CdcWalletLedgerEntryPayload): Promise { + await this.prisma.walletLedgerEntryView.upsert({ + where: { id: BigInt(data.entry_id) }, + create: { + id: BigInt(data.entry_id), + accountSequence: data.account_sequence, + userId: BigInt(data.user_id), + entryType: data.entry_type, + amount: new Decimal(data.amount), + assetType: data.asset_type, + balanceAfter: data.balance_after ? new Decimal(data.balance_after) : null, + refOrderId: data.ref_order_id || null, + refTxHash: data.ref_tx_hash || null, + memo: data.memo || null, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + accountSequence: data.account_sequence, + userId: BigInt(data.user_id), + entryType: data.entry_type, + amount: new Decimal(data.amount), + assetType: data.asset_type, + balanceAfter: data.balance_after ? new Decimal(data.balance_after) : null, + refOrderId: data.ref_order_id || null, + refTxHash: data.ref_tx_hash || null, + memo: data.memo || null, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Wallet-CDC] Created ledger entry: ${data.entry_id} (${data.entry_type})`); + } + + private async handleWalletLedgerEntryUpdate(data: CdcWalletLedgerEntryPayload): Promise { + // 理论上分类账不应该被更新,但为了完整性还是处理 + await this.handleWalletLedgerEntryCreate(data); + this.logger.log(`[Wallet-CDC] Updated ledger entry: ${data.entry_id}`); + } + + private async handleWalletLedgerEntryDelete(data: CdcWalletLedgerEntryPayload): Promise { + const id = BigInt(data.entry_id); + + try { + await this.prisma.walletLedgerEntryView.delete({ + where: { id }, + }); + this.logger.log(`[Wallet-CDC] Deleted ledger entry: ${data.entry_id}`); + } catch { + this.logger.warn(`[Wallet-CDC] Ledger entry not found for delete: ${data.entry_id}`); + } + } + + // ==================== Helper Methods ==================== + + private async isEventProcessed(eventId: string): Promise { + const count = await this.prisma.processedEvent.count({ + where: { eventId }, + }); + return count > 0; + } + + private async markEventProcessed(eventId: string, eventType: string): Promise { + await this.prisma.processedEvent.create({ + data: { + eventId, + eventType, + processedAt: new Date(), + }, + }); + } + + /** + * 获取消费者状态 + */ + getStatus(): { isRunning: boolean; topics: string[]; consumerGroup: string } { + return { + isRunning: this.isRunning, + topics: this.cdcTopics, + consumerGroup: this.consumerGroup, + }; + } +} diff --git a/backend/services/deploy.sh b/backend/services/deploy.sh index 10a1d6ab..c0807f08 100755 --- a/backend/services/deploy.sh +++ b/backend/services/deploy.sh @@ -204,7 +204,7 @@ up() { # Wait for Debezium Connect to be ready log_info "Waiting for Debezium Connect to be ready..." for i in {1..30}; do - if curl -s http://localhost:8083/ > /dev/null 2>&1; then + if curl -s http://localhost:8084/ > /dev/null 2>&1; then log_info "Debezium Connect is ready!" break fi @@ -231,7 +231,7 @@ register_debezium_connectors() { log_info "Registering Debezium connectors..." # Check existing connectors - EXISTING=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]") + EXISTING=$(curl -s http://localhost:8084/connectors 2>/dev/null || echo "[]") # Read database credentials from .env source "$ENV_FILE" @@ -276,7 +276,7 @@ register_debezium_connectors() { RESULT=$(curl -s -X POST \ -H "Content-Type: application/json" \ -d "$IDENTITY_CONFIG" \ - "http://localhost:8083/connectors" 2>/dev/null || echo "failed") + "http://localhost:8084/connectors" 2>/dev/null || echo "failed") if echo "$RESULT" | grep -q "identity-postgres-connector"; then log_info "identity-postgres-connector registered successfully" @@ -325,7 +325,7 @@ register_debezium_connectors() { RESULT=$(curl -s -X POST \ -H "Content-Type: application/json" \ -d "$REFERRAL_CONFIG" \ - "http://localhost:8083/connectors" 2>/dev/null || echo "failed") + "http://localhost:8084/connectors" 2>/dev/null || echo "failed") if echo "$RESULT" | grep -q "referral-postgres-connector"; then log_info "referral-postgres-connector registered successfully" @@ -333,6 +333,153 @@ register_debezium_connectors() { log_warn "Failed to register referral connector: $RESULT" fi fi + + # Register wallet-postgres-connector + if echo "$EXISTING" | grep -q "wallet-postgres-connector"; then + log_info "wallet-postgres-connector already registered" + else + log_info "Registering wallet-postgres-connector..." + WALLET_CONFIG='{ + "name": "wallet-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "'${POSTGRES_USER:-rwa_user}'", + "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", + "database.dbname": "rwa_wallet", + "topic.prefix": "cdc.wallet", + "table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries", + "plugin.name": "pgoutput", + "publication.name": "debezium_wallet_publication", + "publication.autocreate.mode": "filtered", + "slot.name": "debezium_wallet_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "heartbeat.interval.ms": "10000", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } + }' + + RESULT=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "$WALLET_CONFIG" \ + "http://localhost:8084/connectors" 2>/dev/null || echo "failed") + + if echo "$RESULT" | grep -q "wallet-postgres-connector"; then + log_info "wallet-postgres-connector registered successfully" + else + log_warn "Failed to register wallet connector: $RESULT" + fi + fi + + # Register planting-postgres-connector + if echo "$EXISTING" | grep -q "planting-postgres-connector"; then + log_info "planting-postgres-connector already registered" + else + log_info "Registering planting-postgres-connector..." + PLANTING_CONFIG='{ + "name": "planting-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "'${POSTGRES_USER:-rwa_user}'", + "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", + "database.dbname": "rwa_planting", + "topic.prefix": "cdc.planting", + "table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations", + "plugin.name": "pgoutput", + "publication.name": "debezium_planting_publication", + "publication.autocreate.mode": "filtered", + "slot.name": "debezium_planting_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "heartbeat.interval.ms": "10000", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } + }' + + RESULT=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "$PLANTING_CONFIG" \ + "http://localhost:8084/connectors" 2>/dev/null || echo "failed") + + if echo "$RESULT" | grep -q "planting-postgres-connector"; then + log_info "planting-postgres-connector registered successfully" + else + log_warn "Failed to register planting connector: $RESULT" + fi + fi + + # Register authorization-postgres-connector + if echo "$EXISTING" | grep -q "authorization-postgres-connector"; then + log_info "authorization-postgres-connector already registered" + else + log_info "Registering authorization-postgres-connector..." + AUTHORIZATION_CONFIG='{ + "name": "authorization-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "'${POSTGRES_USER:-rwa_user}'", + "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", + "database.dbname": "rwa_authorization", + "topic.prefix": "cdc.authorization", + "table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers", + "plugin.name": "pgoutput", + "publication.name": "debezium_authorization_publication", + "publication.autocreate.mode": "filtered", + "slot.name": "debezium_authorization_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "heartbeat.interval.ms": "10000", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } + }' + + RESULT=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "$AUTHORIZATION_CONFIG" \ + "http://localhost:8084/connectors" 2>/dev/null || echo "failed") + + if echo "$RESULT" | grep -q "authorization-postgres-connector"; then + log_info "authorization-postgres-connector registered successfully" + else + log_warn "Failed to register authorization connector: $RESULT" + fi + fi } down() { @@ -610,10 +757,10 @@ infra_status() { echo -e " ${RED}[FAIL]${NC} Kafka (port 9092)" fi - if curl -s http://localhost:8083/ > /dev/null 2>&1; then - echo -e " ${GREEN}[OK]${NC} Debezium Connect (port 8083)" + if curl -s http://localhost:8084/ > /dev/null 2>&1; then + echo -e " ${GREEN}[OK]${NC} Debezium Connect (port 8084)" # Check connector status - CONNECTOR_STATUS=$(curl -s http://localhost:8083/connectors/identity-postgres-connector/status 2>/dev/null | grep -o '"state":"[^"]*"' | head -1 || echo "") + CONNECTOR_STATUS=$(curl -s http://localhost:8084/connectors/identity-postgres-connector/status 2>/dev/null | grep -o '"state":"[^"]*"' | head -1 || echo "") if echo "$CONNECTOR_STATUS" | grep -q "RUNNING"; then echo -e " └─ ${GREEN}[RUNNING]${NC} identity-postgres-connector" elif [ -n "$CONNECTOR_STATUS" ]; then @@ -622,7 +769,7 @@ infra_status() { echo -e " └─ ${YELLOW}[NOT REGISTERED]${NC} identity-postgres-connector" fi else - echo -e " ${RED}[FAIL]${NC} Debezium Connect (port 8083)" + echo -e " ${RED}[FAIL]${NC} Debezium Connect (port 8084)" fi } @@ -641,28 +788,28 @@ debezium_status() { echo "============================================" echo "" - if ! curl -s http://localhost:8083/ > /dev/null 2>&1; then + if ! curl -s http://localhost:8084/ > /dev/null 2>&1; then log_error "Debezium Connect is not running" exit 1 fi echo "Debezium Connect Version:" - curl -s http://localhost:8083/ | python3 -m json.tool 2>/dev/null || curl -s http://localhost:8083/ + curl -s http://localhost:8084/ | python3 -m json.tool 2>/dev/null || curl -s http://localhost:8084/ echo "" echo "Registered Connectors:" - curl -s http://localhost:8083/connectors | python3 -m json.tool 2>/dev/null || curl -s http://localhost:8083/connectors + curl -s http://localhost:8084/connectors | python3 -m json.tool 2>/dev/null || curl -s http://localhost:8084/connectors echo "" echo "Connector Details:" - curl -s http://localhost:8083/connectors/identity-postgres-connector/status | python3 -m json.tool 2>/dev/null || \ - curl -s http://localhost:8083/connectors/identity-postgres-connector/status + curl -s http://localhost:8084/connectors/identity-postgres-connector/status | python3 -m json.tool 2>/dev/null || \ + curl -s http://localhost:8084/connectors/identity-postgres-connector/status } debezium_register() { log_step "Registering Debezium connectors..." - if ! curl -s http://localhost:8083/ > /dev/null 2>&1; then + if ! curl -s http://localhost:8084/ > /dev/null 2>&1; then log_error "Debezium Connect is not running" exit 1 fi @@ -673,12 +820,12 @@ debezium_register() { debezium_restart_connector() { log_step "Restarting Debezium connector..." - if ! curl -s http://localhost:8083/ > /dev/null 2>&1; then + if ! curl -s http://localhost:8084/ > /dev/null 2>&1; then log_error "Debezium Connect is not running" exit 1 fi - curl -X POST http://localhost:8083/connectors/identity-postgres-connector/restart + curl -X POST http://localhost:8084/connectors/identity-postgres-connector/restart log_info "Connector restart requested" sleep 3 @@ -690,7 +837,7 @@ debezium_delete_connector() { read -p "Are you sure? (y/N): " confirm if [ "$confirm" = "y" ] || [ "$confirm" = "Y" ]; then - curl -X DELETE http://localhost:8083/connectors/identity-postgres-connector + curl -X DELETE http://localhost:8084/connectors/identity-postgres-connector log_info "Connector deleted" else log_info "Cancelled" diff --git a/backend/services/docker-compose.yml b/backend/services/docker-compose.yml index 1821801f..9abe370b 100644 --- a/backend/services/docker-compose.yml +++ b/backend/services/docker-compose.yml @@ -131,7 +131,7 @@ services: postgres: condition: service_healthy ports: - - "8083:8083" + - "8084:8083" environment: TZ: Asia/Shanghai GROUP_ID: debezium-connect diff --git a/backend/services/scripts/debezium/authorization-connector.json b/backend/services/scripts/debezium/authorization-connector.json new file mode 100644 index 00000000..f7a86d02 --- /dev/null +++ b/backend/services/scripts/debezium/authorization-connector.json @@ -0,0 +1,28 @@ +{ + "name": "authorization-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "debezium", + "database.password": "debezium_password", + "database.dbname": "rwa_authorization", + "database.server.name": "authorization", + "topic.prefix": "cdc.authorization", + "plugin.name": "pgoutput", + "publication.name": "authorization_cdc_publication", + "slot.name": "authorization_cdc_slot", + "table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "transforms.unwrap.delete.handling.mode": "rewrite", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/services/scripts/debezium/planting-connector.json b/backend/services/scripts/debezium/planting-connector.json new file mode 100644 index 00000000..f6240b45 --- /dev/null +++ b/backend/services/scripts/debezium/planting-connector.json @@ -0,0 +1,28 @@ +{ + "name": "planting-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "debezium", + "database.password": "debezium_password", + "database.dbname": "rwa_planting", + "database.server.name": "planting", + "topic.prefix": "cdc.planting", + "plugin.name": "pgoutput", + "publication.name": "planting_cdc_publication", + "slot.name": "planting_cdc_slot", + "table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "transforms.unwrap.delete.handling.mode": "rewrite", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/services/scripts/debezium/wallet-connector.json b/backend/services/scripts/debezium/wallet-connector.json new file mode 100644 index 00000000..b3f7a7d1 --- /dev/null +++ b/backend/services/scripts/debezium/wallet-connector.json @@ -0,0 +1,28 @@ +{ + "name": "wallet-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "debezium", + "database.password": "debezium_password", + "database.dbname": "rwa_wallet", + "database.server.name": "wallet", + "topic.prefix": "cdc.wallet", + "plugin.name": "pgoutput", + "publication.name": "wallet_cdc_publication", + "slot.name": "wallet_cdc_slot", + "table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "transforms.unwrap.delete.handling.mode": "rewrite", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +}