From c4cec836d924e71e0163f7e43ddf4645316d1cd8 Mon Sep 17 00:00:00 2001 From: hailin Date: Wed, 7 Jan 2026 18:57:58 -0800 Subject: [PATCH] =?UTF-8?q?feat(admin-service):=20=E6=B7=BB=E5=8A=A0=20ref?= =?UTF-8?q?erral-service=20CDC=20=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 ReferralQueryView schema 和 migration - 新增 ReferralCdcConsumerService 消费推荐关系变更 - 配置 referral-postgres-connector 用于 Debezium CDC - 更新 deploy.sh 自动注册 referral connector - 更新 init-databases.sh 配置 rwa_referral 逻辑复制权限 CDC 同步的字段: - user_id, account_sequence, referrer_id - my_referral_code, used_referral_code - ancestor_path, depth - direct_referral_count, active_direct_count 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../migration.sql | 32 ++ .../admin-service/prisma/schema.prisma | 40 +++ .../src/infrastructure/kafka/index.ts | 1 + .../src/infrastructure/kafka/kafka.module.ts | 25 +- .../kafka/referral-cdc-consumer.service.ts | 289 ++++++++++++++++++ backend/services/deploy.sh | 142 ++++++--- .../scripts/debezium/referral-connector.json | 41 +++ backend/services/scripts/init-databases.sh | 16 +- 8 files changed, 525 insertions(+), 61 deletions(-) create mode 100644 backend/services/admin-service/prisma/migrations/20250107100000_add_referral_query_view/migration.sql create mode 100644 backend/services/admin-service/src/infrastructure/kafka/referral-cdc-consumer.service.ts create mode 100644 backend/services/scripts/debezium/referral-connector.json diff --git a/backend/services/admin-service/prisma/migrations/20250107100000_add_referral_query_view/migration.sql b/backend/services/admin-service/prisma/migrations/20250107100000_add_referral_query_view/migration.sql new file mode 100644 index 00000000..2dbea944 --- /dev/null +++ b/backend/services/admin-service/prisma/migrations/20250107100000_add_referral_query_view/migration.sql @@ -0,0 +1,32 @@ +-- CreateTable: referral_query_view +-- 推荐关系查询视图 - 通过 Debezium CDC 从 referral-service 同步 +CREATE TABLE "referral_query_view" ( + "relationship_id" BIGINT NOT NULL, + "user_id" BIGINT NOT NULL, + "account_sequence" VARCHAR(12) NOT NULL, + "referrer_id" BIGINT, + "root_user_id" BIGINT, + "my_referral_code" VARCHAR(20) NOT NULL, + "used_referral_code" VARCHAR(20), + "ancestor_path" BIGINT[] NOT NULL DEFAULT '{}', + "depth" INTEGER NOT NULL DEFAULT 0, + "direct_referral_count" INTEGER NOT NULL DEFAULT 0, + "active_direct_count" INTEGER NOT NULL DEFAULT 0, + "created_at" TIMESTAMP(3) NOT NULL, + "synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "referral_query_view_pkey" PRIMARY KEY ("relationship_id") +); + +-- CreateIndex: unique constraints +CREATE UNIQUE INDEX "referral_query_view_user_id_key" ON "referral_query_view"("user_id"); +CREATE UNIQUE INDEX "referral_query_view_account_sequence_key" ON "referral_query_view"("account_sequence"); +CREATE UNIQUE INDEX "referral_query_view_my_referral_code_key" ON "referral_query_view"("my_referral_code"); + +-- CreateIndex: query indexes +CREATE INDEX "referral_query_view_referrer_id_idx" ON "referral_query_view"("referrer_id"); +CREATE INDEX "referral_query_view_account_sequence_idx" ON "referral_query_view"("account_sequence"); +CREATE INDEX "referral_query_view_my_referral_code_idx" ON "referral_query_view"("my_referral_code"); +CREATE INDEX "referral_query_view_used_referral_code_idx" ON "referral_query_view"("used_referral_code"); +CREATE INDEX "referral_query_view_root_user_id_idx" ON "referral_query_view"("root_user_id"); +CREATE INDEX "referral_query_view_depth_idx" ON "referral_query_view"("depth"); diff --git a/backend/services/admin-service/prisma/schema.prisma b/backend/services/admin-service/prisma/schema.prisma index d31cdf31..4f995e35 100644 --- a/backend/services/admin-service/prisma/schema.prisma +++ b/backend/services/admin-service/prisma/schema.prisma @@ -469,6 +469,46 @@ model UserQueryView { @@map("user_query_view") } +// ============================================================================= +// Referral Query View (推荐关系查询视图 - 通过 CDC 同步) +// ============================================================================= + +/// 推荐关系查询视图 - 通过 Debezium CDC 从 referral-service 同步 +/// 用于 admin-web 查看用户推荐关系,避免跨服务调用 +model ReferralQueryView { + id BigInt @id @map("relationship_id") + userId BigInt @unique @map("user_id") + accountSequence String @unique @map("account_sequence") @db.VarChar(12) + + // 推荐人信息 + referrerId BigInt? @map("referrer_id") + rootUserId BigInt? @map("root_user_id") + + // 推荐码 + myReferralCode String @unique @map("my_referral_code") @db.VarChar(20) + usedReferralCode String? @map("used_referral_code") @db.VarChar(20) + + // 推荐链信息 + ancestorPath BigInt[] @map("ancestor_path") + depth Int @default(0) @map("depth") + + // 直推统计 + directReferralCount Int @default(0) @map("direct_referral_count") + activeDirectCount Int @default(0) @map("active_direct_count") + + // 时间戳 + createdAt DateTime @map("created_at") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([referrerId]) + @@index([accountSequence]) + @@index([myReferralCode]) + @@index([usedReferralCode]) + @@index([rootUserId]) + @@index([depth]) + @@map("referral_query_view") +} + // ============================================================================= // Kafka Event Tracking (事件消费追踪) // ============================================================================= diff --git a/backend/services/admin-service/src/infrastructure/kafka/index.ts b/backend/services/admin-service/src/infrastructure/kafka/index.ts index 17a93366..c9cb8a5b 100644 --- a/backend/services/admin-service/src/infrastructure/kafka/index.ts +++ b/backend/services/admin-service/src/infrastructure/kafka/index.ts @@ -1,3 +1,4 @@ export * from './kafka.module'; export * from './user-event-consumer.service'; export * from './cdc-consumer.service'; +export * from './referral-cdc-consumer.service'; diff --git a/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts index 686453c9..f79d5c19 100644 --- a/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts @@ -5,17 +5,17 @@ import { UserQueryRepositoryImpl } from '../persistence/repositories/user-query. import { USER_QUERY_REPOSITORY } from '../../domain/repositories/user-query.repository'; import { UserEventConsumerService } from './user-event-consumer.service'; import { CdcConsumerService } from './cdc-consumer.service'; +import { ReferralCdcConsumerService } from './referral-cdc-consumer.service'; /** * Kafka 模块 * - * 提供两种消费者: - * 1. UserEventConsumerService - 消费 Outbox 领域事件 (保留,用于业务事件) - * 2. CdcConsumerService - 消费 Debezium CDC 事件 (新增,用于数据同步) + * CDC 消费者 - 通过 Debezium 从各服务同步数据: + * 1. CdcConsumerService - identity-service 用户数据 + * 2. ReferralCdcConsumerService - referral-service 推荐关系 * - * 通过环境变量 CDC_ENABLED 控制是否启用 CDC 消费者: - * - CDC_ENABLED=true: 使用 CDC 消费者 (推荐) - * - CDC_ENABLED=false: 使用传统 Outbox 消费者 + * Outbox 消费者 - 处理业务领域事件: + * 1. UserEventConsumerService - 用户相关业务事件 */ @Module({ imports: [ConfigModule], @@ -25,11 +25,18 @@ import { CdcConsumerService } from './cdc-consumer.service'; provide: USER_QUERY_REPOSITORY, useClass: UserQueryRepositoryImpl, }, - // 传统 Outbox 事件消费者 (保留用于领域事件) + // Outbox 事件消费者 (业务领域事件) UserEventConsumerService, - // CDC 消费者 (用于数据同步) + // CDC 消费者 - identity-service CdcConsumerService, + // CDC 消费者 - referral-service + ReferralCdcConsumerService, + ], + exports: [ + UserEventConsumerService, + CdcConsumerService, + ReferralCdcConsumerService, + USER_QUERY_REPOSITORY, ], - exports: [UserEventConsumerService, CdcConsumerService, USER_QUERY_REPOSITORY], }) export class KafkaModule {} diff --git a/backend/services/admin-service/src/infrastructure/kafka/referral-cdc-consumer.service.ts b/backend/services/admin-service/src/infrastructure/kafka/referral-cdc-consumer.service.ts new file mode 100644 index 00000000..1d6d0811 --- /dev/null +++ b/backend/services/admin-service/src/infrastructure/kafka/referral-cdc-consumer.service.ts @@ -0,0 +1,289 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; +import { PrismaService } from '../persistence/prisma/prisma.service'; + +/** + * Debezium CDC 事件结构 (经过 ExtractNewRecordState 转换后) + * 来自 referral-service 的 referral_relationships 表 + */ +interface CdcReferralPayload { + // 推荐关系表字段 (snake_case from PostgreSQL) + relationship_id: string; + user_id: string; + account_sequence: string; + referrer_id?: string | null; + root_user_id?: string | null; + my_referral_code: string; + used_referral_code?: string | null; + ancestor_path?: string[] | null; // PostgreSQL bigint[] 作为字符串数组 + depth: number; + direct_referral_count: number; + active_direct_count: number; + created_at: string; + updated_at: string; + + // Debezium 元数据 (由 ExtractNewRecordState 添加) + __op: 'c' | 'u' | 'd' | 'r'; // c=create, u=update, d=delete, r=read(snapshot) + __table: string; + __source_ts_ms: number; + __deleted?: string; // 'true' for delete events when using rewrite mode +} + +/** + * Referral CDC 消费者服务 + * + * 消费 Debezium 从 referral-service PostgreSQL 捕获的推荐关系变更 + * + * Topic: cdc.referral.public.referral_relationships + */ +@Injectable() +export class ReferralCdcConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(ReferralCdcConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isRunning = false; + + // CDC Topic + private readonly cdcTopic = 'cdc.referral.public.referral_relationships'; + private readonly consumerGroup: string; + + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, + ) { + const brokers = (this.configService.get('KAFKA_BROKERS', 'localhost:9092')).split(','); + const clientId = this.configService.get('KAFKA_CLIENT_ID', 'admin-service'); + this.consumerGroup = this.configService.get('KAFKA_REFERRAL_CDC_GROUP', 'admin-service-referral-cdc'); + + this.kafka = new Kafka({ + clientId: `${clientId}-referral-cdc`, + brokers, + logLevel: logLevel.WARN, + }); + + this.consumer = this.kafka.consumer({ groupId: this.consumerGroup }); + + this.logger.log(`[Referral-CDC] Configured to consume topic: ${this.cdcTopic}`); + } + + async onModuleInit() { + await this.start(); + } + + async onModuleDestroy() { + await this.stop(); + } + + async start(): Promise { + if (this.isRunning) { + this.logger.warn('[Referral-CDC] Consumer already running'); + return; + } + + try { + this.logger.log('[Referral-CDC] Connecting to Kafka...'); + await this.consumer.connect(); + + await this.consumer.subscribe({ + topic: this.cdcTopic, + fromBeginning: false, + }); + + this.logger.log(`[Referral-CDC] Subscribed to topic: ${this.cdcTopic}`); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.isRunning = true; + this.logger.log('[Referral-CDC] Consumer started successfully'); + } catch (error) { + this.logger.error('[Referral-CDC] Failed to start consumer:', error); + } + } + + async stop(): Promise { + if (!this.isRunning) return; + + try { + await this.consumer.disconnect(); + this.isRunning = false; + this.logger.log('[Referral-CDC] Consumer stopped'); + } catch (error) { + this.logger.error('[Referral-CDC] Failed to stop consumer:', error); + } + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { topic, partition, message } = payload; + + if (!message.value) { + this.logger.warn(`[Referral-CDC] Empty message from ${topic}:${partition}`); + return; + } + + try { + const data = JSON.parse(message.value.toString()) as CdcReferralPayload; + const operation = data.__op; + const sourceTs = data.__source_ts_ms; + + this.logger.debug( + `[Referral-CDC] Received ${operation} event for user ${data.account_sequence} ` + + `(ts: ${new Date(sourceTs).toISOString()})` + ); + + // 幂等性检查 + const eventId = `referral-cdc:${topic}:${partition}:${message.offset}`; + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`[Referral-CDC] Event ${eventId} already processed, skipping`); + return; + } + + // 处理 CDC 事件 + await this.processCdcEvent(data); + + // 记录已处理 + await this.markEventProcessed(eventId, `referral-cdc:${operation}`); + + this.logger.log( + `[Referral-CDC] ✓ Processed ${operation} for user: ${data.account_sequence}` + ); + } catch (error) { + this.logger.error(`[Referral-CDC] Failed to process message:`, error); + throw error; // 让 KafkaJS 重试 + } + } + + private async processCdcEvent(data: CdcReferralPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + await this.handleDelete(data); + } else if (operation === 'c' || operation === 'r') { + await this.handleCreateOrSnapshot(data); + } else if (operation === 'u') { + await this.handleUpdate(data); + } + } + + private async handleCreateOrSnapshot(data: CdcReferralPayload): Promise { + const ancestorPath = this.parseAncestorPath(data.ancestor_path); + + await this.prisma.referralQueryView.upsert({ + where: { id: BigInt(data.relationship_id) }, + create: { + id: BigInt(data.relationship_id), + userId: BigInt(data.user_id), + accountSequence: data.account_sequence, + referrerId: data.referrer_id ? BigInt(data.referrer_id) : null, + rootUserId: data.root_user_id ? BigInt(data.root_user_id) : null, + myReferralCode: data.my_referral_code, + usedReferralCode: data.used_referral_code || null, + ancestorPath, + depth: data.depth, + directReferralCount: data.direct_referral_count, + activeDirectCount: data.active_direct_count, + createdAt: new Date(data.created_at), + syncedAt: new Date(), + }, + update: { + userId: BigInt(data.user_id), + accountSequence: data.account_sequence, + referrerId: data.referrer_id ? BigInt(data.referrer_id) : null, + rootUserId: data.root_user_id ? BigInt(data.root_user_id) : null, + myReferralCode: data.my_referral_code, + usedReferralCode: data.used_referral_code || null, + ancestorPath, + depth: data.depth, + directReferralCount: data.direct_referral_count, + activeDirectCount: data.active_direct_count, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Referral-CDC] Created/Snapshot referral: ${data.account_sequence}`); + } + + private async handleUpdate(data: CdcReferralPayload): Promise { + const id = BigInt(data.relationship_id); + const exists = await this.prisma.referralQueryView.findUnique({ + where: { id }, + select: { id: true }, + }); + + if (!exists) { + await this.handleCreateOrSnapshot(data); + return; + } + + const ancestorPath = this.parseAncestorPath(data.ancestor_path); + + await this.prisma.referralQueryView.update({ + where: { id }, + data: { + referrerId: data.referrer_id ? BigInt(data.referrer_id) : null, + rootUserId: data.root_user_id ? BigInt(data.root_user_id) : null, + usedReferralCode: data.used_referral_code || null, + ancestorPath, + depth: data.depth, + directReferralCount: data.direct_referral_count, + activeDirectCount: data.active_direct_count, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[Referral-CDC] Updated referral: ${data.account_sequence}`); + } + + private async handleDelete(data: CdcReferralPayload): Promise { + const id = BigInt(data.relationship_id); + + try { + await this.prisma.referralQueryView.delete({ + where: { id }, + }); + this.logger.log(`[Referral-CDC] Deleted referral: ${data.account_sequence}`); + } catch { + this.logger.warn(`[Referral-CDC] Referral not found for delete: ${data.account_sequence}`); + } + } + + // ==================== Helper Methods ==================== + + private parseAncestorPath(path: string[] | null | undefined): bigint[] { + if (!path || !Array.isArray(path)) return []; + return path.map(id => BigInt(id)); + } + + private async isEventProcessed(eventId: string): Promise { + const count = await this.prisma.processedEvent.count({ + where: { eventId }, + }); + return count > 0; + } + + private async markEventProcessed(eventId: string, eventType: string): Promise { + await this.prisma.processedEvent.create({ + data: { + eventId, + eventType, + processedAt: new Date(), + }, + }); + } + + /** + * 获取消费者状态 + */ + getStatus(): { isRunning: boolean; topic: string; consumerGroup: string } { + return { + isRunning: this.isRunning, + topic: this.cdcTopic, + consumerGroup: this.consumerGroup, + }; + } +} diff --git a/backend/services/deploy.sh b/backend/services/deploy.sh index 9db8f422..10a1d6ab 100755 --- a/backend/services/deploy.sh +++ b/backend/services/deploy.sh @@ -230,60 +230,108 @@ up() { register_debezium_connectors() { log_info "Registering Debezium connectors..." - # Check if connector already exists + # Check existing connectors EXISTING=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]") - if echo "$EXISTING" | grep -q "identity-postgres-connector"; then - log_info "identity-postgres-connector already registered" - return - fi - # Read database credentials from .env source "$ENV_FILE" # Register identity-postgres-connector - CONNECTOR_CONFIG='{ - "name": "identity-postgres-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "tasks.max": "1", - "database.hostname": "postgres", - "database.port": "5432", - "database.user": "'${POSTGRES_USER:-rwa_user}'", - "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", - "database.dbname": "rwa_identity", - "topic.prefix": "cdc.identity", - "table.include.list": "public.user_accounts", - "plugin.name": "pgoutput", - "publication.name": "debezium_identity_publication", - "publication.autocreate.mode": "filtered", - "slot.name": "debezium_identity_slot", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.drop.tombstones": "true", - "transforms.unwrap.delete.handling.mode": "rewrite", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "heartbeat.interval.ms": "10000", - "snapshot.mode": "initial", - "decimal.handling.mode": "string", - "time.precision.mode": "connect" - } - }' - - RESULT=$(curl -s -X POST \ - -H "Content-Type: application/json" \ - -d "$CONNECTOR_CONFIG" \ - "http://localhost:8083/connectors" 2>/dev/null || echo "failed") - - if echo "$RESULT" | grep -q "identity-postgres-connector"; then - log_info "identity-postgres-connector registered successfully" + if echo "$EXISTING" | grep -q "identity-postgres-connector"; then + log_info "identity-postgres-connector already registered" else - log_warn "Failed to register connector: $RESULT" - log_warn "You may need to register it manually: ./deploy.sh debezium-register" + log_info "Registering identity-postgres-connector..." + IDENTITY_CONFIG='{ + "name": "identity-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "'${POSTGRES_USER:-rwa_user}'", + "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", + "database.dbname": "rwa_identity", + "topic.prefix": "cdc.identity", + "table.include.list": "public.user_accounts", + "plugin.name": "pgoutput", + "publication.name": "debezium_identity_publication", + "publication.autocreate.mode": "filtered", + "slot.name": "debezium_identity_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "heartbeat.interval.ms": "10000", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } + }' + + RESULT=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "$IDENTITY_CONFIG" \ + "http://localhost:8083/connectors" 2>/dev/null || echo "failed") + + if echo "$RESULT" | grep -q "identity-postgres-connector"; then + log_info "identity-postgres-connector registered successfully" + else + log_warn "Failed to register identity connector: $RESULT" + fi + fi + + # Register referral-postgres-connector + if echo "$EXISTING" | grep -q "referral-postgres-connector"; then + log_info "referral-postgres-connector already registered" + else + log_info "Registering referral-postgres-connector..." + REFERRAL_CONFIG='{ + "name": "referral-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "'${POSTGRES_USER:-rwa_user}'", + "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", + "database.dbname": "rwa_referral", + "topic.prefix": "cdc.referral", + "table.include.list": "public.referral_relationships", + "plugin.name": "pgoutput", + "publication.name": "debezium_referral_publication", + "publication.autocreate.mode": "filtered", + "slot.name": "debezium_referral_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "heartbeat.interval.ms": "10000", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } + }' + + RESULT=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "$REFERRAL_CONFIG" \ + "http://localhost:8083/connectors" 2>/dev/null || echo "failed") + + if echo "$RESULT" | grep -q "referral-postgres-connector"; then + log_info "referral-postgres-connector registered successfully" + else + log_warn "Failed to register referral connector: $RESULT" + fi fi } diff --git a/backend/services/scripts/debezium/referral-connector.json b/backend/services/scripts/debezium/referral-connector.json new file mode 100644 index 00000000..8a3af356 --- /dev/null +++ b/backend/services/scripts/debezium/referral-connector.json @@ -0,0 +1,41 @@ +{ + "name": "referral-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", + "database.dbname": "rwa_referral", + + "topic.prefix": "cdc.referral", + + "table.include.list": "public.referral_relationships", + + "plugin.name": "pgoutput", + "publication.name": "debezium_referral_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_referral_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + + "heartbeat.interval.ms": "10000", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/services/scripts/init-databases.sh b/backend/services/scripts/init-databases.sh index cdfdb844..3280b806 100644 --- a/backend/services/scripts/init-databases.sh +++ b/backend/services/scripts/init-databases.sh @@ -43,8 +43,9 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" <<-EOSQL END \$\$; - -- Grant connect to identity database + -- Grant connect to identity and referral databases GRANT CONNECT ON DATABASE rwa_identity TO debezium; + GRANT CONNECT ON DATABASE rwa_referral TO debezium; EOSQL # Grant schema permissions on rwa_identity @@ -55,11 +56,16 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "rwa_identity" <<-E -- Grant select on all tables (current and future) GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium; ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium; +EOSQL - -- Create publication for CDC (user_accounts table only for now) - -- This will be created after the table exists (by Prisma migration) - -- DROP PUBLICATION IF EXISTS debezium_identity_publication; - -- CREATE PUBLICATION debezium_identity_publication FOR TABLE user_accounts; +# Grant schema permissions on rwa_referral +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "rwa_referral" <<-EOSQL + -- Grant usage on public schema + GRANT USAGE ON SCHEMA public TO debezium; + + -- Grant select on all tables (current and future) + GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium; + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium; EOSQL echo "PostgreSQL CDC configuration completed!"