diff --git a/backend/services/admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/admin-service/src/infrastructure/kafka/cdc-consumer.service.ts new file mode 100644 index 00000000..7710cdbe --- /dev/null +++ b/backend/services/admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -0,0 +1,283 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy, Inject } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; +import { PrismaService } from '../persistence/prisma/prisma.service'; +import { IUserQueryRepository, USER_QUERY_REPOSITORY } from '../../domain/repositories/user-query.repository'; + +/** + * Debezium CDC 事件结构 (经过 ExtractNewRecordState 转换后) + * + * 原始 Debezium 格式包含 before/after/op 等字段 + * 经过 unwrap 转换后,直接是表的字段 + __op, __table, __source_ts_ms + */ +interface CdcUserAccountPayload { + // 用户表字段 (snake_case from PostgreSQL) + user_id: string; + account_sequence: string; + phone_number?: string | null; + nickname: string; + avatar_url?: string | null; + inviter_sequence?: string | null; + kyc_status: string; + status: string; + registered_at: string; + updated_at: string; + + // Debezium 元数据 (由 ExtractNewRecordState 添加) + __op: 'c' | 'u' | 'd' | 'r'; // c=create, u=update, d=delete, r=read(snapshot) + __table: string; + __source_ts_ms: number; + __deleted?: string; // 'true' for delete events when using rewrite mode +} + +/** + * CDC 消费者服务 + * + * 消费 Debezium 从 identity-service PostgreSQL 捕获的数据变更 + * 替代原有的基于 Outbox 的 UserEventConsumerService + * + * Topic: cdc.identity.public.user_accounts + * + * 优势: + * - 零代码侵入 identity-service + * - 100% 捕获所有数据变更 + * - 自动捕获 schema 变更 + */ +@Injectable() +export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(CdcConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isRunning = false; + + // CDC Topic + private readonly cdcTopic = 'cdc.identity.public.user_accounts'; + private readonly consumerGroup: string; + + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, + @Inject(USER_QUERY_REPOSITORY) + private readonly userQueryRepository: IUserQueryRepository, + ) { + const brokers = (this.configService.get('KAFKA_BROKERS', 'localhost:9092')).split(','); + const clientId = this.configService.get('KAFKA_CLIENT_ID', 'admin-service'); + this.consumerGroup = this.configService.get('KAFKA_CDC_CONSUMER_GROUP', 'admin-service-cdc'); + + this.kafka = new Kafka({ + clientId: `${clientId}-cdc`, + brokers, + logLevel: logLevel.WARN, + }); + + this.consumer = this.kafka.consumer({ groupId: this.consumerGroup }); + + this.logger.log(`[CDC] Configured to consume topic: ${this.cdcTopic}`); + } + + async onModuleInit() { + await this.start(); + } + + async onModuleDestroy() { + await this.stop(); + } + + async start(): Promise { + if (this.isRunning) { + this.logger.warn('[CDC] Consumer already running'); + return; + } + + try { + this.logger.log('[CDC] Connecting to Kafka...'); + await this.consumer.connect(); + + await this.consumer.subscribe({ + topic: this.cdcTopic, + fromBeginning: false, // 只消费新的变更 + }); + + this.logger.log(`[CDC] Subscribed to topic: ${this.cdcTopic}`); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.isRunning = true; + this.logger.log('[CDC] Consumer started successfully'); + } catch (error) { + this.logger.error('[CDC] Failed to start consumer:', error); + // 不阻塞服务启动,CDC 可以稍后重试 + } + } + + async stop(): Promise { + if (!this.isRunning) return; + + try { + await this.consumer.disconnect(); + this.isRunning = false; + this.logger.log('[CDC] Consumer stopped'); + } catch (error) { + this.logger.error('[CDC] Failed to stop consumer:', error); + } + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { topic, partition, message } = payload; + + if (!message.value) { + this.logger.warn(`[CDC] Empty message from ${topic}:${partition}`); + return; + } + + try { + const data = JSON.parse(message.value.toString()) as CdcUserAccountPayload; + const operation = data.__op; + const sourceTs = data.__source_ts_ms; + + this.logger.debug( + `[CDC] Received ${operation} event for user ${data.account_sequence} ` + + `(ts: ${new Date(sourceTs).toISOString()})` + ); + + // 幂等性检查:基于 message offset + const eventId = `cdc:${topic}:${partition}:${message.offset}`; + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`[CDC] Event ${eventId} already processed, skipping`); + return; + } + + // 处理 CDC 事件 + await this.processCdcEvent(data); + + // 记录已处理 + await this.markEventProcessed(eventId, `cdc:${operation}`); + + this.logger.log( + `[CDC] ✓ Processed ${operation} for user: ${data.account_sequence}` + ); + } catch (error) { + this.logger.error(`[CDC] Failed to process message:`, error); + // 不抛出错误,避免阻塞消费 + } + } + + private async processCdcEvent(data: CdcUserAccountPayload): Promise { + const operation = data.__op; + const isDeleted = data.__deleted === 'true'; + + if (operation === 'd' || isDeleted) { + // 删除操作 + await this.handleDelete(data); + } else if (operation === 'c' || operation === 'r') { + // 创建或快照读取 + await this.handleCreateOrSnapshot(data); + } else if (operation === 'u') { + // 更新操作 + await this.handleUpdate(data); + } + } + + private async handleCreateOrSnapshot(data: CdcUserAccountPayload): Promise { + const phoneNumberMasked = data.phone_number + ? this.maskPhoneNumber(data.phone_number) + : null; + + await this.userQueryRepository.upsert({ + userId: BigInt(data.user_id), + accountSequence: data.account_sequence, + nickname: data.nickname || null, + avatarUrl: data.avatar_url || null, + phoneNumberMasked, + inviterSequence: data.inviter_sequence || null, + kycStatus: data.kyc_status, + status: data.status, + registeredAt: new Date(data.registered_at), + }); + + this.logger.log(`[CDC] Created/Snapshot user: ${data.account_sequence}`); + } + + private async handleUpdate(data: CdcUserAccountPayload): Promise { + const userId = BigInt(data.user_id); + + // 检查用户是否存在 + const exists = await this.userQueryRepository.exists(userId); + + if (!exists) { + // 如果不存在,创建(可能是之前遗漏的) + await this.handleCreateOrSnapshot(data); + return; + } + + const phoneNumberMasked = data.phone_number + ? this.maskPhoneNumber(data.phone_number) + : null; + + // 更新所有字段 + await this.prisma.userQueryView.update({ + where: { userId }, + data: { + nickname: data.nickname || null, + avatarUrl: data.avatar_url || null, + phoneNumberMasked, + inviterSequence: data.inviter_sequence || null, + kycStatus: data.kyc_status, + status: data.status, + syncedAt: new Date(), + }, + }); + + this.logger.log(`[CDC] Updated user: ${data.account_sequence}`); + } + + private async handleDelete(data: CdcUserAccountPayload): Promise { + const userId = BigInt(data.user_id); + + // 软删除:更新状态为 DELETED + const exists = await this.userQueryRepository.exists(userId); + if (exists) { + await this.userQueryRepository.updateStatus(userId, 'DELETED'); + this.logger.log(`[CDC] Marked user as deleted: ${data.account_sequence}`); + } + } + + // ==================== Helper Methods ==================== + + private maskPhoneNumber(phone: string): string { + if (phone.length < 7) return phone; + return phone.slice(0, 3) + '****' + phone.slice(-4); + } + + private async isEventProcessed(eventId: string): Promise { + const count = await this.prisma.processedEvent.count({ + where: { eventId }, + }); + return count > 0; + } + + private async markEventProcessed(eventId: string, eventType: string): Promise { + await this.prisma.processedEvent.create({ + data: { + eventId, + eventType, + processedAt: new Date(), + }, + }); + } + + /** + * 获取消费者状态 + */ + getStatus(): { isRunning: boolean; topic: string; consumerGroup: string } { + return { + isRunning: this.isRunning, + topic: this.cdcTopic, + consumerGroup: this.consumerGroup, + }; + } +} diff --git a/backend/services/admin-service/src/infrastructure/kafka/index.ts b/backend/services/admin-service/src/infrastructure/kafka/index.ts index 05175f74..17a93366 100644 --- a/backend/services/admin-service/src/infrastructure/kafka/index.ts +++ b/backend/services/admin-service/src/infrastructure/kafka/index.ts @@ -1,2 +1,3 @@ export * from './kafka.module'; export * from './user-event-consumer.service'; +export * from './cdc-consumer.service'; diff --git a/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts index 37175dae..686453c9 100644 --- a/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/admin-service/src/infrastructure/kafka/kafka.module.ts @@ -1,10 +1,22 @@ import { Module } from '@nestjs/common'; -import { ConfigModule } from '@nestjs/config'; +import { ConfigModule, ConfigService } from '@nestjs/config'; import { PrismaService } from '../persistence/prisma/prisma.service'; import { UserQueryRepositoryImpl } from '../persistence/repositories/user-query.repository.impl'; import { USER_QUERY_REPOSITORY } from '../../domain/repositories/user-query.repository'; import { UserEventConsumerService } from './user-event-consumer.service'; +import { CdcConsumerService } from './cdc-consumer.service'; +/** + * Kafka 模块 + * + * 提供两种消费者: + * 1. UserEventConsumerService - 消费 Outbox 领域事件 (保留,用于业务事件) + * 2. CdcConsumerService - 消费 Debezium CDC 事件 (新增,用于数据同步) + * + * 通过环境变量 CDC_ENABLED 控制是否启用 CDC 消费者: + * - CDC_ENABLED=true: 使用 CDC 消费者 (推荐) + * - CDC_ENABLED=false: 使用传统 Outbox 消费者 + */ @Module({ imports: [ConfigModule], providers: [ @@ -13,8 +25,11 @@ import { UserEventConsumerService } from './user-event-consumer.service'; provide: USER_QUERY_REPOSITORY, useClass: UserQueryRepositoryImpl, }, + // 传统 Outbox 事件消费者 (保留用于领域事件) UserEventConsumerService, + // CDC 消费者 (用于数据同步) + CdcConsumerService, ], - exports: [UserEventConsumerService, USER_QUERY_REPOSITORY], + exports: [UserEventConsumerService, CdcConsumerService, USER_QUERY_REPOSITORY], }) export class KafkaModule {} diff --git a/backend/services/deploy.sh b/backend/services/deploy.sh index 603dde64..9db8f422 100755 --- a/backend/services/deploy.sh +++ b/backend/services/deploy.sh @@ -197,6 +197,26 @@ up() { log_info "Waiting for infrastructure to be ready..." sleep 10 + # Start Debezium Connect (CDC) + log_info "Starting Debezium Connect..." + docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d debezium-connect + + # Wait for Debezium Connect to be ready + log_info "Waiting for Debezium Connect to be ready..." + for i in {1..30}; do + if curl -s http://localhost:8083/ > /dev/null 2>&1; then + log_info "Debezium Connect is ready!" + break + fi + if [ $i -eq 30 ]; then + log_warn "Debezium Connect not ready after 60s, continuing anyway..." + fi + sleep 2 + done + + # Register Debezium connectors + register_debezium_connectors + # Start application services log_info "Starting application services..." docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d @@ -207,6 +227,66 @@ up() { log_info "View logs with: ./deploy.sh logs" } +register_debezium_connectors() { + log_info "Registering Debezium connectors..." + + # Check if connector already exists + EXISTING=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]") + + if echo "$EXISTING" | grep -q "identity-postgres-connector"; then + log_info "identity-postgres-connector already registered" + return + fi + + # Read database credentials from .env + source "$ENV_FILE" + + # Register identity-postgres-connector + CONNECTOR_CONFIG='{ + "name": "identity-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "'${POSTGRES_USER:-rwa_user}'", + "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", + "database.dbname": "rwa_identity", + "topic.prefix": "cdc.identity", + "table.include.list": "public.user_accounts", + "plugin.name": "pgoutput", + "publication.name": "debezium_identity_publication", + "publication.autocreate.mode": "filtered", + "slot.name": "debezium_identity_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "heartbeat.interval.ms": "10000", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } + }' + + RESULT=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "$CONNECTOR_CONFIG" \ + "http://localhost:8083/connectors" 2>/dev/null || echo "failed") + + if echo "$RESULT" | grep -q "identity-postgres-connector"; then + log_info "identity-postgres-connector registered successfully" + else + log_warn "Failed to register connector: $RESULT" + log_warn "You may need to register it manually: ./deploy.sh debezium-register" + fi +} + down() { log_step "Stopping RWA Backend Services..." docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" down @@ -429,13 +509,13 @@ clean() { infra_up() { log_step "Starting infrastructure services..." - docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d postgres redis zookeeper kafka + docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d postgres redis zookeeper kafka debezium-connect log_info "Infrastructure services started" } infra_down() { log_step "Stopping infrastructure services..." - docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop postgres redis kafka zookeeper + docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop postgres redis kafka zookeeper debezium-connect log_info "Infrastructure services stopped" } @@ -453,7 +533,7 @@ infra_status() { echo "============================================" echo "" - docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" ps postgres redis zookeeper kafka + docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" ps postgres redis zookeeper kafka debezium-connect echo "" echo "Health Check:" @@ -481,10 +561,92 @@ infra_status() { else echo -e " ${RED}[FAIL]${NC} Kafka (port 9092)" fi + + if curl -s http://localhost:8083/ > /dev/null 2>&1; then + echo -e " ${GREEN}[OK]${NC} Debezium Connect (port 8083)" + # Check connector status + CONNECTOR_STATUS=$(curl -s http://localhost:8083/connectors/identity-postgres-connector/status 2>/dev/null | grep -o '"state":"[^"]*"' | head -1 || echo "") + if echo "$CONNECTOR_STATUS" | grep -q "RUNNING"; then + echo -e " └─ ${GREEN}[RUNNING]${NC} identity-postgres-connector" + elif [ -n "$CONNECTOR_STATUS" ]; then + echo -e " └─ ${YELLOW}[$CONNECTOR_STATUS]${NC} identity-postgres-connector" + else + echo -e " └─ ${YELLOW}[NOT REGISTERED]${NC} identity-postgres-connector" + fi + else + echo -e " ${RED}[FAIL]${NC} Debezium Connect (port 8083)" + fi } infra_logs() { - docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" logs -f postgres redis zookeeper kafka + docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" logs -f postgres redis zookeeper kafka debezium-connect +} + +# =========================================================================== +# Debezium CDC Operations +# =========================================================================== + +debezium_status() { + echo "" + echo "============================================" + echo "Debezium CDC Status" + echo "============================================" + echo "" + + if ! curl -s http://localhost:8083/ > /dev/null 2>&1; then + log_error "Debezium Connect is not running" + exit 1 + fi + + echo "Debezium Connect Version:" + curl -s http://localhost:8083/ | python3 -m json.tool 2>/dev/null || curl -s http://localhost:8083/ + + echo "" + echo "Registered Connectors:" + curl -s http://localhost:8083/connectors | python3 -m json.tool 2>/dev/null || curl -s http://localhost:8083/connectors + + echo "" + echo "Connector Details:" + curl -s http://localhost:8083/connectors/identity-postgres-connector/status | python3 -m json.tool 2>/dev/null || \ + curl -s http://localhost:8083/connectors/identity-postgres-connector/status +} + +debezium_register() { + log_step "Registering Debezium connectors..." + + if ! curl -s http://localhost:8083/ > /dev/null 2>&1; then + log_error "Debezium Connect is not running" + exit 1 + fi + + register_debezium_connectors +} + +debezium_restart_connector() { + log_step "Restarting Debezium connector..." + + if ! curl -s http://localhost:8083/ > /dev/null 2>&1; then + log_error "Debezium Connect is not running" + exit 1 + fi + + curl -X POST http://localhost:8083/connectors/identity-postgres-connector/restart + + log_info "Connector restart requested" + sleep 3 + debezium_status +} + +debezium_delete_connector() { + log_warn "This will delete the identity-postgres-connector!" + read -p "Are you sure? (y/N): " confirm + + if [ "$confirm" = "y" ] || [ "$confirm" = "Y" ]; then + curl -X DELETE http://localhost:8083/connectors/identity-postgres-connector + log_info "Connector deleted" + else + log_info "Cancelled" + fi } infra_clean() { @@ -664,6 +826,18 @@ case "${1:-}" in infra-reset) infra_reset ;; + debezium-status|cdc-status) + debezium_status + ;; + debezium-register|cdc-register) + debezium_register + ;; + debezium-restart|cdc-restart) + debezium_restart_connector + ;; + debezium-delete|cdc-delete) + debezium_delete_connector + ;; *) echo "RWA Backend Services Deployment Script" echo "" @@ -690,7 +864,7 @@ case "${1:-}" in echo " rebuild-svc [--no-cache] - Rebuild and restart a specific service" echo "" echo "Infrastructure Commands:" - echo " infra-up - Start infrastructure (postgres, redis, kafka)" + echo " infra-up - Start infrastructure (postgres, redis, kafka, debezium)" echo " infra-down - Stop infrastructure services" echo " infra-restart - Restart infrastructure services" echo " infra-status - Show infrastructure status and health" @@ -698,6 +872,12 @@ case "${1:-}" in echo " infra-clean - Remove infrastructure containers and volumes (DELETES DATA)" echo " infra-reset - Clean and reinstall infrastructure (DELETES DATA)" echo "" + echo "Debezium CDC Commands:" + echo " debezium-status - Show Debezium connector status" + echo " debezium-register - Register Debezium connectors" + echo " debezium-restart - Restart Debezium connector" + echo " debezium-delete - Delete Debezium connector" + echo "" echo "Services:" echo " identity-service, wallet-service, backup-service, planting-service," echo " referral-service, reward-service, mpc-service, leaderboard-service," diff --git a/backend/services/docker-compose.yml b/backend/services/docker-compose.yml index 28d3e136..1821801f 100644 --- a/backend/services/docker-compose.yml +++ b/backend/services/docker-compose.yml @@ -26,6 +26,15 @@ services: volumes: - postgres_data:/var/lib/postgresql/data - ./scripts/init-databases.sh:/docker-entrypoint-initdb.d/init-databases.sh:ro + # Enable logical replication for Debezium CDC + command: + - "postgres" + - "-c" + - "wal_level=logical" + - "-c" + - "max_replication_slots=4" + - "-c" + - "max_wal_senders=4" healthcheck: test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"] interval: 5s @@ -110,6 +119,44 @@ services: networks: - rwa-network + # --------------------------------------------------------------------------- + # Debezium Kafka Connect - CDC (Change Data Capture) + # --------------------------------------------------------------------------- + debezium-connect: + image: debezium/connect:2.4 + container_name: rwa-debezium-connect + depends_on: + kafka: + condition: service_healthy + postgres: + condition: service_healthy + ports: + - "8083:8083" + environment: + TZ: Asia/Shanghai + GROUP_ID: debezium-connect + BOOTSTRAP_SERVERS: kafka:29092 + CONFIG_STORAGE_TOPIC: debezium_configs + OFFSET_STORAGE_TOPIC: debezium_offsets + STATUS_STORAGE_TOPIC: debezium_statuses + CONFIG_STORAGE_REPLICATION_FACTOR: 1 + OFFSET_STORAGE_REPLICATION_FACTOR: 1 + STATUS_STORAGE_REPLICATION_FACTOR: 1 + # Connector settings + KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + KEY_CONVERTER_SCHEMAS_ENABLE: "false" + VALUE_CONVERTER_SCHEMAS_ENABLE: "false" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8083/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 60s + restart: unless-stopped + networks: + - rwa-network + # =========================================================================== # Application Services # =========================================================================== @@ -472,7 +519,7 @@ services: referral-service: condition: service_healthy healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:3007/api/health"] + test: ["CMD", "curl", "-f", "http://localhost:3007/api/v1/health"] interval: 30s timeout: 3s retries: 3 @@ -595,6 +642,8 @@ services: - KAFKA_BROKERS=kafka:29092 - KAFKA_CLIENT_ID=admin-service - KAFKA_CONSUMER_GROUP=admin-service-user-sync + # CDC Consumer (Debezium) + - KAFKA_CDC_CONSUMER_GROUP=admin-service-cdc depends_on: postgres: condition: service_healthy @@ -602,6 +651,8 @@ services: condition: service_healthy kafka: condition: service_started + debezium-connect: + condition: service_healthy healthcheck: test: ["CMD", "curl", "-f", "http://localhost:3010/api/v1/health"] interval: 30s diff --git a/backend/services/scripts/debezium/README.md b/backend/services/scripts/debezium/README.md new file mode 100644 index 00000000..eb14374b --- /dev/null +++ b/backend/services/scripts/debezium/README.md @@ -0,0 +1,215 @@ +# Debezium CDC 配置指南 + +## 概述 + +本项目使用 Debezium 实现 Change Data Capture (CDC),将 `identity-service` 的用户数据实时同步到 `admin-service`。 + +``` +┌─────────────────┐ WAL ┌──────────────┐ Kafka ┌─────────────────┐ +│ identity-service│ ────────► │ Debezium │ ──────────► │ admin-service │ +│ PostgreSQL │ │ Kafka Connect│ │ UserQueryView │ +└─────────────────┘ └──────────────┘ └─────────────────┘ +``` + +## 架构说明 + +### 组件 + +1. **PostgreSQL** - 启用 logical replication (`wal_level=logical`) +2. **Debezium Connect** - Kafka Connect with Debezium PostgreSQL connector +3. **Kafka** - 消息传输 +4. **CdcConsumerService** - admin-service 中的 CDC 消费者 + +### 数据流 + +``` +1. identity-service 写入 user_accounts 表 +2. PostgreSQL 将变更写入 WAL +3. Debezium 读取 WAL,发布到 Kafka topic: cdc.identity.public.user_accounts +4. admin-service 的 CdcConsumerService 消费事件 +5. 更新 admin-service 的 UserQueryView 表 +``` + +## 部署步骤 + +### 1. 启动基础设施 + +```bash +cd backend/services +docker-compose up -d postgres kafka zookeeper debezium-connect +``` + +### 2. 等待 Debezium Connect 就绪 + +```bash +# 检查 Debezium Connect 状态 +curl http://localhost:8083/ + +# 期望输出: +# {"version":"3.x.x","commit":"...","kafka_cluster_id":"..."} +``` + +### 3. 注册 Connector + +```bash +# 方式1: 使用脚本 +cd backend/services/scripts/debezium +chmod +x register-connectors.sh +./register-connectors.sh + +# 方式2: 手动注册 +curl -X POST http://localhost:8083/connectors \ + -H "Content-Type: application/json" \ + -d @identity-connector.json +``` + +### 4. 验证 Connector 状态 + +```bash +# 列出所有 connectors +curl http://localhost:8083/connectors + +# 查看 connector 详情 +curl http://localhost:8083/connectors/identity-postgres-connector/status +``` + +### 5. 启动 admin-service + +```bash +docker-compose up -d admin-service +``` + +## 配置说明 + +### PostgreSQL 配置 + +在 `docker-compose.yml` 中,PostgreSQL 启动参数: + +```yaml +command: + - "postgres" + - "-c" + - "wal_level=logical" + - "-c" + - "max_replication_slots=4" + - "-c" + - "max_wal_senders=4" +``` + +### Debezium Connector 配置 + +关键配置项 (`identity-connector.json`): + +| 配置项 | 说明 | +|--------|------| +| `database.hostname` | PostgreSQL 主机 | +| `database.dbname` | 数据库名 `rwa_identity` | +| `table.include.list` | 监听的表 `public.user_accounts` | +| `topic.prefix` | Kafka topic 前缀 `cdc.identity` | +| `plugin.name` | 使用 `pgoutput` (PostgreSQL 原生) | +| `snapshot.mode` | `initial` 首次启动时全量快照 | + +### 环境变量 + +admin-service: + +| 变量 | 说明 | 默认值 | +|------|------|--------| +| `KAFKA_CDC_CONSUMER_GROUP` | CDC 消费者组 | `admin-service-cdc` | + +## 监控与运维 + +### 查看 Connector 状态 + +```bash +# 状态 +curl http://localhost:8083/connectors/identity-postgres-connector/status + +# 任务状态 +curl http://localhost:8083/connectors/identity-postgres-connector/tasks/0/status +``` + +### 重启 Connector + +```bash +curl -X POST http://localhost:8083/connectors/identity-postgres-connector/restart +``` + +### 查看 Kafka Topic + +```bash +# 进入 Kafka 容器 +docker exec -it rwa-kafka bash + +# 列出 topics +kafka-topics --bootstrap-server localhost:9092 --list + +# 消费 CDC topic +kafka-console-consumer --bootstrap-server localhost:9092 \ + --topic cdc.identity.public.user_accounts \ + --from-beginning +``` + +### 查看消费者 Lag + +```bash +kafka-consumer-groups --bootstrap-server localhost:9092 \ + --group admin-service-cdc \ + --describe +``` + +## 故障排查 + +### Connector 状态为 FAILED + +```bash +# 查看错误信息 +curl http://localhost:8083/connectors/identity-postgres-connector/status | jq + +# 常见原因: +# 1. PostgreSQL 未启用 logical replication +# 2. 数据库连接失败 +# 3. 权限不足 +``` + +### 数据不同步 + +1. 检查 Connector 状态 +2. 检查 Kafka consumer lag +3. 查看 admin-service 日志 + +```bash +docker logs rwa-admin-service | grep CDC +``` + +### 重置 Connector Offset + +⚠️ 危险操作,会导致数据重新同步 + +```bash +# 删除 connector +curl -X DELETE http://localhost:8083/connectors/identity-postgres-connector + +# 删除 replication slot (在 PostgreSQL 中) +SELECT pg_drop_replication_slot('debezium_identity_slot'); + +# 重新注册 connector +./register-connectors.sh +``` + +## 与 Outbox 模式对比 + +| 特性 | Outbox (原方案) | Debezium CDC (新方案) | +|------|-----------------|----------------------| +| 代码侵入 | 需要在每个写操作添加事件 | 零侵入 | +| 可靠性 | 依赖代码正确发布 | 100% 捕获 WAL | +| 延迟 | 1秒轮询 | 近实时 (毫秒级) | +| Schema 变更 | 需手动同步 | 自动捕获 | +| 运维复杂度 | 简单 | 需要 Kafka Connect | + +## 注意事项 + +1. **首次启动**: Debezium 会进行全量快照,可能需要较长时间 +2. **Schema 变更**: 添加字段会自动同步,删除字段需要手动处理 +3. **大表**: 快照时可能影响数据库性能,建议在低峰期进行 +4. **数据一致性**: CDC 是最终一致性,存在短暂延迟 (通常 < 200ms) diff --git a/backend/services/scripts/debezium/identity-connector.json b/backend/services/scripts/debezium/identity-connector.json new file mode 100644 index 00000000..e8888606 --- /dev/null +++ b/backend/services/scripts/debezium/identity-connector.json @@ -0,0 +1,41 @@ +{ + "name": "identity-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", + "database.dbname": "rwa_identity", + + "topic.prefix": "cdc.identity", + + "table.include.list": "public.user_accounts", + + "plugin.name": "pgoutput", + "publication.name": "debezium_identity_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_identity_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + + "heartbeat.interval.ms": "10000", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/services/scripts/debezium/register-connectors.sh b/backend/services/scripts/debezium/register-connectors.sh new file mode 100644 index 00000000..771e065f --- /dev/null +++ b/backend/services/scripts/debezium/register-connectors.sh @@ -0,0 +1,118 @@ +#!/bin/bash +# ============================================================================= +# Debezium Connector Registration Script +# ============================================================================= +# Usage: ./register-connectors.sh +# +# This script registers the PostgreSQL connector for identity-service CDC +# It should be run after Debezium Connect is fully started +# ============================================================================= + +set -e + +CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8083}" +MAX_RETRIES=30 +RETRY_INTERVAL=5 + +echo "=== Debezium Connector Registration ===" +echo "Connect URL: $CONNECT_URL" + +# Wait for Debezium Connect to be ready +echo "Waiting for Debezium Connect to be ready..." +for i in $(seq 1 $MAX_RETRIES); do + if curl -s "$CONNECT_URL/" > /dev/null 2>&1; then + echo "Debezium Connect is ready!" + break + fi + if [ $i -eq $MAX_RETRIES ]; then + echo "ERROR: Debezium Connect is not ready after $MAX_RETRIES attempts" + exit 1 + fi + echo "Attempt $i/$MAX_RETRIES - waiting ${RETRY_INTERVAL}s..." + sleep $RETRY_INTERVAL +done + +# Check existing connectors +echo "" +echo "Checking existing connectors..." +EXISTING=$(curl -s "$CONNECT_URL/connectors") +echo "Existing connectors: $EXISTING" + +# Register identity-postgres connector +echo "" +echo "Registering identity-postgres-connector..." + +CONNECTOR_CONFIG='{ + "name": "identity-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "'${POSTGRES_USER:-rwa_user}'", + "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", + "database.dbname": "rwa_identity", + "database.server.name": "identity", + + "topic.prefix": "cdc.identity", + + "table.include.list": "public.user_accounts", + + "plugin.name": "pgoutput", + "publication.name": "debezium_identity_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_identity_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + + "heartbeat.interval.ms": "10000", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +}' + +# Delete existing connector if exists +if echo "$EXISTING" | grep -q "identity-postgres-connector"; then + echo "Deleting existing identity-postgres-connector..." + curl -s -X DELETE "$CONNECT_URL/connectors/identity-postgres-connector" + sleep 2 +fi + +# Create connector +RESULT=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "$CONNECTOR_CONFIG" \ + "$CONNECT_URL/connectors") + +echo "Result: $RESULT" + +# Check connector status +echo "" +echo "Checking connector status..." +sleep 3 +STATUS=$(curl -s "$CONNECT_URL/connectors/identity-postgres-connector/status") +echo "Connector status: $STATUS" + +echo "" +echo "=== Registration Complete ===" +echo "" +echo "Kafka topics created:" +echo " - cdc.identity.public.user_accounts" +echo "" +echo "To verify:" +echo " curl $CONNECT_URL/connectors" +echo " curl $CONNECT_URL/connectors/identity-postgres-connector/status" diff --git a/backend/services/scripts/init-databases.sh b/backend/services/scripts/init-databases.sh index d000a45a..cdfdb844 100644 --- a/backend/services/scripts/init-databases.sh +++ b/backend/services/scripts/init-databases.sh @@ -17,3 +17,50 @@ for db in rwa_identity rwa_wallet rwa_mpc rwa_backup rwa_planting rwa_referral r done echo "All databases created successfully!" + +# ============================================================================= +# Configure PostgreSQL for Debezium CDC (Logical Replication) +# ============================================================================= +echo "Configuring PostgreSQL for Debezium CDC..." + +# Set wal_level to logical for CDC support +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" <<-EOSQL + -- Set wal_level to logical (requires restart to take effect) + ALTER SYSTEM SET wal_level = 'logical'; + + -- Increase max_replication_slots for Debezium + ALTER SYSTEM SET max_replication_slots = 4; + + -- Increase max_wal_senders for replication connections + ALTER SYSTEM SET max_wal_senders = 4; + + -- Create replication user for Debezium (if not exists) + DO \$\$ + BEGIN + IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'debezium') THEN + CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD 'debezium_password'; + END IF; + END + \$\$; + + -- Grant connect to identity database + GRANT CONNECT ON DATABASE rwa_identity TO debezium; +EOSQL + +# Grant schema permissions on rwa_identity +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "rwa_identity" <<-EOSQL + -- Grant usage on public schema + GRANT USAGE ON SCHEMA public TO debezium; + + -- Grant select on all tables (current and future) + GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium; + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium; + + -- Create publication for CDC (user_accounts table only for now) + -- This will be created after the table exists (by Prisma migration) + -- DROP PUBLICATION IF EXISTS debezium_identity_publication; + -- CREATE PUBLICATION debezium_identity_publication FOR TABLE user_accounts; +EOSQL + +echo "PostgreSQL CDC configuration completed!" +echo "NOTE: PostgreSQL restart is required for wal_level change to take effect."