feat(admin-service): 实现 Debezium CDC 数据同步

- 新增 CdcConsumerService 消费 PostgreSQL WAL 变更事件
- 配置 Debezium Connect 服务和 PostgreSQL 逻辑复制
- 更新 deploy.sh 支持 Debezium 启动和连接器管理
- 新增 identity-postgres-connector 配置同步 user_accounts 表
- 保留原有 Outbox 机制用于业务领域事件

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-07 18:35:53 -08:00
parent cc17f6a38e
commit b5ebf8a615
9 changed files with 959 additions and 8 deletions

View File

@ -0,0 +1,283 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy, Inject } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
import { PrismaService } from '../persistence/prisma/prisma.service';
import { IUserQueryRepository, USER_QUERY_REPOSITORY } from '../../domain/repositories/user-query.repository';
/**
* Debezium CDC ( ExtractNewRecordState )
*
* Debezium before/after/op
* unwrap + __op, __table, __source_ts_ms
*/
interface CdcUserAccountPayload {
// 用户表字段 (snake_case from PostgreSQL)
user_id: string;
account_sequence: string;
phone_number?: string | null;
nickname: string;
avatar_url?: string | null;
inviter_sequence?: string | null;
kyc_status: string;
status: string;
registered_at: string;
updated_at: string;
// Debezium 元数据 (由 ExtractNewRecordState 添加)
__op: 'c' | 'u' | 'd' | 'r'; // c=create, u=update, d=delete, r=read(snapshot)
__table: string;
__source_ts_ms: number;
__deleted?: string; // 'true' for delete events when using rewrite mode
}
/**
* CDC
*
* Debezium identity-service PostgreSQL
* Outbox UserEventConsumerService
*
* Topic: cdc.identity.public.user_accounts
*
*
* - identity-service
* - 100%
* - schema
*/
@Injectable()
export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CdcConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isRunning = false;
// CDC Topic
private readonly cdcTopic = 'cdc.identity.public.user_accounts';
private readonly consumerGroup: string;
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
@Inject(USER_QUERY_REPOSITORY)
private readonly userQueryRepository: IUserQueryRepository,
) {
const brokers = (this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092')).split(',');
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID', 'admin-service');
this.consumerGroup = this.configService.get<string>('KAFKA_CDC_CONSUMER_GROUP', 'admin-service-cdc');
this.kafka = new Kafka({
clientId: `${clientId}-cdc`,
brokers,
logLevel: logLevel.WARN,
});
this.consumer = this.kafka.consumer({ groupId: this.consumerGroup });
this.logger.log(`[CDC] Configured to consume topic: ${this.cdcTopic}`);
}
async onModuleInit() {
await this.start();
}
async onModuleDestroy() {
await this.stop();
}
async start(): Promise<void> {
if (this.isRunning) {
this.logger.warn('[CDC] Consumer already running');
return;
}
try {
this.logger.log('[CDC] Connecting to Kafka...');
await this.consumer.connect();
await this.consumer.subscribe({
topic: this.cdcTopic,
fromBeginning: false, // 只消费新的变更
});
this.logger.log(`[CDC] Subscribed to topic: ${this.cdcTopic}`);
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
this.isRunning = true;
this.logger.log('[CDC] Consumer started successfully');
} catch (error) {
this.logger.error('[CDC] Failed to start consumer:', error);
// 不阻塞服务启动CDC 可以稍后重试
}
}
async stop(): Promise<void> {
if (!this.isRunning) return;
try {
await this.consumer.disconnect();
this.isRunning = false;
this.logger.log('[CDC] Consumer stopped');
} catch (error) {
this.logger.error('[CDC] Failed to stop consumer:', error);
}
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { topic, partition, message } = payload;
if (!message.value) {
this.logger.warn(`[CDC] Empty message from ${topic}:${partition}`);
return;
}
try {
const data = JSON.parse(message.value.toString()) as CdcUserAccountPayload;
const operation = data.__op;
const sourceTs = data.__source_ts_ms;
this.logger.debug(
`[CDC] Received ${operation} event for user ${data.account_sequence} ` +
`(ts: ${new Date(sourceTs).toISOString()})`
);
// 幂等性检查:基于 message offset
const eventId = `cdc:${topic}:${partition}:${message.offset}`;
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`[CDC] Event ${eventId} already processed, skipping`);
return;
}
// 处理 CDC 事件
await this.processCdcEvent(data);
// 记录已处理
await this.markEventProcessed(eventId, `cdc:${operation}`);
this.logger.log(
`[CDC] ✓ Processed ${operation} for user: ${data.account_sequence}`
);
} catch (error) {
this.logger.error(`[CDC] Failed to process message:`, error);
// 不抛出错误,避免阻塞消费
}
}
private async processCdcEvent(data: CdcUserAccountPayload): Promise<void> {
const operation = data.__op;
const isDeleted = data.__deleted === 'true';
if (operation === 'd' || isDeleted) {
// 删除操作
await this.handleDelete(data);
} else if (operation === 'c' || operation === 'r') {
// 创建或快照读取
await this.handleCreateOrSnapshot(data);
} else if (operation === 'u') {
// 更新操作
await this.handleUpdate(data);
}
}
private async handleCreateOrSnapshot(data: CdcUserAccountPayload): Promise<void> {
const phoneNumberMasked = data.phone_number
? this.maskPhoneNumber(data.phone_number)
: null;
await this.userQueryRepository.upsert({
userId: BigInt(data.user_id),
accountSequence: data.account_sequence,
nickname: data.nickname || null,
avatarUrl: data.avatar_url || null,
phoneNumberMasked,
inviterSequence: data.inviter_sequence || null,
kycStatus: data.kyc_status,
status: data.status,
registeredAt: new Date(data.registered_at),
});
this.logger.log(`[CDC] Created/Snapshot user: ${data.account_sequence}`);
}
private async handleUpdate(data: CdcUserAccountPayload): Promise<void> {
const userId = BigInt(data.user_id);
// 检查用户是否存在
const exists = await this.userQueryRepository.exists(userId);
if (!exists) {
// 如果不存在,创建(可能是之前遗漏的)
await this.handleCreateOrSnapshot(data);
return;
}
const phoneNumberMasked = data.phone_number
? this.maskPhoneNumber(data.phone_number)
: null;
// 更新所有字段
await this.prisma.userQueryView.update({
where: { userId },
data: {
nickname: data.nickname || null,
avatarUrl: data.avatar_url || null,
phoneNumberMasked,
inviterSequence: data.inviter_sequence || null,
kycStatus: data.kyc_status,
status: data.status,
syncedAt: new Date(),
},
});
this.logger.log(`[CDC] Updated user: ${data.account_sequence}`);
}
private async handleDelete(data: CdcUserAccountPayload): Promise<void> {
const userId = BigInt(data.user_id);
// 软删除:更新状态为 DELETED
const exists = await this.userQueryRepository.exists(userId);
if (exists) {
await this.userQueryRepository.updateStatus(userId, 'DELETED');
this.logger.log(`[CDC] Marked user as deleted: ${data.account_sequence}`);
}
}
// ==================== Helper Methods ====================
private maskPhoneNumber(phone: string): string {
if (phone.length < 7) return phone;
return phone.slice(0, 3) + '****' + phone.slice(-4);
}
private async isEventProcessed(eventId: string): Promise<boolean> {
const count = await this.prisma.processedEvent.count({
where: { eventId },
});
return count > 0;
}
private async markEventProcessed(eventId: string, eventType: string): Promise<void> {
await this.prisma.processedEvent.create({
data: {
eventId,
eventType,
processedAt: new Date(),
},
});
}
/**
*
*/
getStatus(): { isRunning: boolean; topic: string; consumerGroup: string } {
return {
isRunning: this.isRunning,
topic: this.cdcTopic,
consumerGroup: this.consumerGroup,
};
}
}

View File

@ -1,2 +1,3 @@
export * from './kafka.module';
export * from './user-event-consumer.service';
export * from './cdc-consumer.service';

View File

@ -1,10 +1,22 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { PrismaService } from '../persistence/prisma/prisma.service';
import { UserQueryRepositoryImpl } from '../persistence/repositories/user-query.repository.impl';
import { USER_QUERY_REPOSITORY } from '../../domain/repositories/user-query.repository';
import { UserEventConsumerService } from './user-event-consumer.service';
import { CdcConsumerService } from './cdc-consumer.service';
/**
* Kafka
*
*
* 1. UserEventConsumerService - Outbox ()
* 2. CdcConsumerService - Debezium CDC ()
*
* CDC_ENABLED CDC
* - CDC_ENABLED=true: 使 CDC ()
* - CDC_ENABLED=false: 使 Outbox
*/
@Module({
imports: [ConfigModule],
providers: [
@ -13,8 +25,11 @@ import { UserEventConsumerService } from './user-event-consumer.service';
provide: USER_QUERY_REPOSITORY,
useClass: UserQueryRepositoryImpl,
},
// 传统 Outbox 事件消费者 (保留用于领域事件)
UserEventConsumerService,
// CDC 消费者 (用于数据同步)
CdcConsumerService,
],
exports: [UserEventConsumerService, USER_QUERY_REPOSITORY],
exports: [UserEventConsumerService, CdcConsumerService, USER_QUERY_REPOSITORY],
})
export class KafkaModule {}

View File

@ -197,6 +197,26 @@ up() {
log_info "Waiting for infrastructure to be ready..."
sleep 10
# Start Debezium Connect (CDC)
log_info "Starting Debezium Connect..."
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d debezium-connect
# Wait for Debezium Connect to be ready
log_info "Waiting for Debezium Connect to be ready..."
for i in {1..30}; do
if curl -s http://localhost:8083/ > /dev/null 2>&1; then
log_info "Debezium Connect is ready!"
break
fi
if [ $i -eq 30 ]; then
log_warn "Debezium Connect not ready after 60s, continuing anyway..."
fi
sleep 2
done
# Register Debezium connectors
register_debezium_connectors
# Start application services
log_info "Starting application services..."
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d
@ -207,6 +227,66 @@ up() {
log_info "View logs with: ./deploy.sh logs"
}
register_debezium_connectors() {
log_info "Registering Debezium connectors..."
# Check if connector already exists
EXISTING=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]")
if echo "$EXISTING" | grep -q "identity-postgres-connector"; then
log_info "identity-postgres-connector already registered"
return
fi
# Read database credentials from .env
source "$ENV_FILE"
# Register identity-postgres-connector
CONNECTOR_CONFIG='{
"name": "identity-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "'${POSTGRES_USER:-rwa_user}'",
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
"database.dbname": "rwa_identity",
"topic.prefix": "cdc.identity",
"table.include.list": "public.user_accounts",
"plugin.name": "pgoutput",
"publication.name": "debezium_identity_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_identity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}'
RESULT=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "$CONNECTOR_CONFIG" \
"http://localhost:8083/connectors" 2>/dev/null || echo "failed")
if echo "$RESULT" | grep -q "identity-postgres-connector"; then
log_info "identity-postgres-connector registered successfully"
else
log_warn "Failed to register connector: $RESULT"
log_warn "You may need to register it manually: ./deploy.sh debezium-register"
fi
}
down() {
log_step "Stopping RWA Backend Services..."
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" down
@ -429,13 +509,13 @@ clean() {
infra_up() {
log_step "Starting infrastructure services..."
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d postgres redis zookeeper kafka
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d postgres redis zookeeper kafka debezium-connect
log_info "Infrastructure services started"
}
infra_down() {
log_step "Stopping infrastructure services..."
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop postgres redis kafka zookeeper
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop postgres redis kafka zookeeper debezium-connect
log_info "Infrastructure services stopped"
}
@ -453,7 +533,7 @@ infra_status() {
echo "============================================"
echo ""
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" ps postgres redis zookeeper kafka
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" ps postgres redis zookeeper kafka debezium-connect
echo ""
echo "Health Check:"
@ -481,10 +561,92 @@ infra_status() {
else
echo -e " ${RED}[FAIL]${NC} Kafka (port 9092)"
fi
if curl -s http://localhost:8083/ > /dev/null 2>&1; then
echo -e " ${GREEN}[OK]${NC} Debezium Connect (port 8083)"
# Check connector status
CONNECTOR_STATUS=$(curl -s http://localhost:8083/connectors/identity-postgres-connector/status 2>/dev/null | grep -o '"state":"[^"]*"' | head -1 || echo "")
if echo "$CONNECTOR_STATUS" | grep -q "RUNNING"; then
echo -e " └─ ${GREEN}[RUNNING]${NC} identity-postgres-connector"
elif [ -n "$CONNECTOR_STATUS" ]; then
echo -e " └─ ${YELLOW}[$CONNECTOR_STATUS]${NC} identity-postgres-connector"
else
echo -e " └─ ${YELLOW}[NOT REGISTERED]${NC} identity-postgres-connector"
fi
else
echo -e " ${RED}[FAIL]${NC} Debezium Connect (port 8083)"
fi
}
infra_logs() {
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" logs -f postgres redis zookeeper kafka
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" logs -f postgres redis zookeeper kafka debezium-connect
}
# ===========================================================================
# Debezium CDC Operations
# ===========================================================================
debezium_status() {
echo ""
echo "============================================"
echo "Debezium CDC Status"
echo "============================================"
echo ""
if ! curl -s http://localhost:8083/ > /dev/null 2>&1; then
log_error "Debezium Connect is not running"
exit 1
fi
echo "Debezium Connect Version:"
curl -s http://localhost:8083/ | python3 -m json.tool 2>/dev/null || curl -s http://localhost:8083/
echo ""
echo "Registered Connectors:"
curl -s http://localhost:8083/connectors | python3 -m json.tool 2>/dev/null || curl -s http://localhost:8083/connectors
echo ""
echo "Connector Details:"
curl -s http://localhost:8083/connectors/identity-postgres-connector/status | python3 -m json.tool 2>/dev/null || \
curl -s http://localhost:8083/connectors/identity-postgres-connector/status
}
debezium_register() {
log_step "Registering Debezium connectors..."
if ! curl -s http://localhost:8083/ > /dev/null 2>&1; then
log_error "Debezium Connect is not running"
exit 1
fi
register_debezium_connectors
}
debezium_restart_connector() {
log_step "Restarting Debezium connector..."
if ! curl -s http://localhost:8083/ > /dev/null 2>&1; then
log_error "Debezium Connect is not running"
exit 1
fi
curl -X POST http://localhost:8083/connectors/identity-postgres-connector/restart
log_info "Connector restart requested"
sleep 3
debezium_status
}
debezium_delete_connector() {
log_warn "This will delete the identity-postgres-connector!"
read -p "Are you sure? (y/N): " confirm
if [ "$confirm" = "y" ] || [ "$confirm" = "Y" ]; then
curl -X DELETE http://localhost:8083/connectors/identity-postgres-connector
log_info "Connector deleted"
else
log_info "Cancelled"
fi
}
infra_clean() {
@ -664,6 +826,18 @@ case "${1:-}" in
infra-reset)
infra_reset
;;
debezium-status|cdc-status)
debezium_status
;;
debezium-register|cdc-register)
debezium_register
;;
debezium-restart|cdc-restart)
debezium_restart_connector
;;
debezium-delete|cdc-delete)
debezium_delete_connector
;;
*)
echo "RWA Backend Services Deployment Script"
echo ""
@ -690,7 +864,7 @@ case "${1:-}" in
echo " rebuild-svc <name> [--no-cache] - Rebuild and restart a specific service"
echo ""
echo "Infrastructure Commands:"
echo " infra-up - Start infrastructure (postgres, redis, kafka)"
echo " infra-up - Start infrastructure (postgres, redis, kafka, debezium)"
echo " infra-down - Stop infrastructure services"
echo " infra-restart - Restart infrastructure services"
echo " infra-status - Show infrastructure status and health"
@ -698,6 +872,12 @@ case "${1:-}" in
echo " infra-clean - Remove infrastructure containers and volumes (DELETES DATA)"
echo " infra-reset - Clean and reinstall infrastructure (DELETES DATA)"
echo ""
echo "Debezium CDC Commands:"
echo " debezium-status - Show Debezium connector status"
echo " debezium-register - Register Debezium connectors"
echo " debezium-restart - Restart Debezium connector"
echo " debezium-delete - Delete Debezium connector"
echo ""
echo "Services:"
echo " identity-service, wallet-service, backup-service, planting-service,"
echo " referral-service, reward-service, mpc-service, leaderboard-service,"

View File

@ -26,6 +26,15 @@ services:
volumes:
- postgres_data:/var/lib/postgresql/data
- ./scripts/init-databases.sh:/docker-entrypoint-initdb.d/init-databases.sh:ro
# Enable logical replication for Debezium CDC
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"]
interval: 5s
@ -110,6 +119,44 @@ services:
networks:
- rwa-network
# ---------------------------------------------------------------------------
# Debezium Kafka Connect - CDC (Change Data Capture)
# ---------------------------------------------------------------------------
debezium-connect:
image: debezium/connect:2.4
container_name: rwa-debezium-connect
depends_on:
kafka:
condition: service_healthy
postgres:
condition: service_healthy
ports:
- "8083:8083"
environment:
TZ: Asia/Shanghai
GROUP_ID: debezium-connect
BOOTSTRAP_SERVERS: kafka:29092
CONFIG_STORAGE_TOPIC: debezium_configs
OFFSET_STORAGE_TOPIC: debezium_offsets
STATUS_STORAGE_TOPIC: debezium_statuses
CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1
# Connector settings
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
KEY_CONVERTER_SCHEMAS_ENABLE: "false"
VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8083/"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
restart: unless-stopped
networks:
- rwa-network
# ===========================================================================
# Application Services
# ===========================================================================
@ -472,7 +519,7 @@ services:
referral-service:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3007/api/health"]
test: ["CMD", "curl", "-f", "http://localhost:3007/api/v1/health"]
interval: 30s
timeout: 3s
retries: 3
@ -595,6 +642,8 @@ services:
- KAFKA_BROKERS=kafka:29092
- KAFKA_CLIENT_ID=admin-service
- KAFKA_CONSUMER_GROUP=admin-service-user-sync
# CDC Consumer (Debezium)
- KAFKA_CDC_CONSUMER_GROUP=admin-service-cdc
depends_on:
postgres:
condition: service_healthy
@ -602,6 +651,8 @@ services:
condition: service_healthy
kafka:
condition: service_started
debezium-connect:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3010/api/v1/health"]
interval: 30s

View File

@ -0,0 +1,215 @@
# Debezium CDC 配置指南
## 概述
本项目使用 Debezium 实现 Change Data Capture (CDC),将 `identity-service` 的用户数据实时同步到 `admin-service`
```
┌─────────────────┐ WAL ┌──────────────┐ Kafka ┌─────────────────┐
│ identity-service│ ────────► │ Debezium │ ──────────► │ admin-service │
│ PostgreSQL │ │ Kafka Connect│ │ UserQueryView │
└─────────────────┘ └──────────────┘ └─────────────────┘
```
## 架构说明
### 组件
1. **PostgreSQL** - 启用 logical replication (`wal_level=logical`)
2. **Debezium Connect** - Kafka Connect with Debezium PostgreSQL connector
3. **Kafka** - 消息传输
4. **CdcConsumerService** - admin-service 中的 CDC 消费者
### 数据流
```
1. identity-service 写入 user_accounts 表
2. PostgreSQL 将变更写入 WAL
3. Debezium 读取 WAL发布到 Kafka topic: cdc.identity.public.user_accounts
4. admin-service 的 CdcConsumerService 消费事件
5. 更新 admin-service 的 UserQueryView 表
```
## 部署步骤
### 1. 启动基础设施
```bash
cd backend/services
docker-compose up -d postgres kafka zookeeper debezium-connect
```
### 2. 等待 Debezium Connect 就绪
```bash
# 检查 Debezium Connect 状态
curl http://localhost:8083/
# 期望输出:
# {"version":"3.x.x","commit":"...","kafka_cluster_id":"..."}
```
### 3. 注册 Connector
```bash
# 方式1: 使用脚本
cd backend/services/scripts/debezium
chmod +x register-connectors.sh
./register-connectors.sh
# 方式2: 手动注册
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @identity-connector.json
```
### 4. 验证 Connector 状态
```bash
# 列出所有 connectors
curl http://localhost:8083/connectors
# 查看 connector 详情
curl http://localhost:8083/connectors/identity-postgres-connector/status
```
### 5. 启动 admin-service
```bash
docker-compose up -d admin-service
```
## 配置说明
### PostgreSQL 配置
`docker-compose.yml`PostgreSQL 启动参数:
```yaml
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
```
### Debezium Connector 配置
关键配置项 (`identity-connector.json`):
| 配置项 | 说明 |
|--------|------|
| `database.hostname` | PostgreSQL 主机 |
| `database.dbname` | 数据库名 `rwa_identity` |
| `table.include.list` | 监听的表 `public.user_accounts` |
| `topic.prefix` | Kafka topic 前缀 `cdc.identity` |
| `plugin.name` | 使用 `pgoutput` (PostgreSQL 原生) |
| `snapshot.mode` | `initial` 首次启动时全量快照 |
### 环境变量
admin-service:
| 变量 | 说明 | 默认值 |
|------|------|--------|
| `KAFKA_CDC_CONSUMER_GROUP` | CDC 消费者组 | `admin-service-cdc` |
## 监控与运维
### 查看 Connector 状态
```bash
# 状态
curl http://localhost:8083/connectors/identity-postgres-connector/status
# 任务状态
curl http://localhost:8083/connectors/identity-postgres-connector/tasks/0/status
```
### 重启 Connector
```bash
curl -X POST http://localhost:8083/connectors/identity-postgres-connector/restart
```
### 查看 Kafka Topic
```bash
# 进入 Kafka 容器
docker exec -it rwa-kafka bash
# 列出 topics
kafka-topics --bootstrap-server localhost:9092 --list
# 消费 CDC topic
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic cdc.identity.public.user_accounts \
--from-beginning
```
### 查看消费者 Lag
```bash
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group admin-service-cdc \
--describe
```
## 故障排查
### Connector 状态为 FAILED
```bash
# 查看错误信息
curl http://localhost:8083/connectors/identity-postgres-connector/status | jq
# 常见原因:
# 1. PostgreSQL 未启用 logical replication
# 2. 数据库连接失败
# 3. 权限不足
```
### 数据不同步
1. 检查 Connector 状态
2. 检查 Kafka consumer lag
3. 查看 admin-service 日志
```bash
docker logs rwa-admin-service | grep CDC
```
### 重置 Connector Offset
⚠️ 危险操作,会导致数据重新同步
```bash
# 删除 connector
curl -X DELETE http://localhost:8083/connectors/identity-postgres-connector
# 删除 replication slot (在 PostgreSQL 中)
SELECT pg_drop_replication_slot('debezium_identity_slot');
# 重新注册 connector
./register-connectors.sh
```
## 与 Outbox 模式对比
| 特性 | Outbox (原方案) | Debezium CDC (新方案) |
|------|-----------------|----------------------|
| 代码侵入 | 需要在每个写操作添加事件 | 零侵入 |
| 可靠性 | 依赖代码正确发布 | 100% 捕获 WAL |
| 延迟 | 1秒轮询 | 近实时 (毫秒级) |
| Schema 变更 | 需手动同步 | 自动捕获 |
| 运维复杂度 | 简单 | 需要 Kafka Connect |
## 注意事项
1. **首次启动**: Debezium 会进行全量快照,可能需要较长时间
2. **Schema 变更**: 添加字段会自动同步,删除字段需要手动处理
3. **大表**: 快照时可能影响数据库性能,建议在低峰期进行
4. **数据一致性**: CDC 是最终一致性,存在短暂延迟 (通常 < 200ms)

View File

@ -0,0 +1,41 @@
{
"name": "identity-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "${POSTGRES_USER:-rwa_user}",
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
"database.dbname": "rwa_identity",
"topic.prefix": "cdc.identity",
"table.include.list": "public.user_accounts",
"plugin.name": "pgoutput",
"publication.name": "debezium_identity_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_identity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}

View File

@ -0,0 +1,118 @@
#!/bin/bash
# =============================================================================
# Debezium Connector Registration Script
# =============================================================================
# Usage: ./register-connectors.sh
#
# This script registers the PostgreSQL connector for identity-service CDC
# It should be run after Debezium Connect is fully started
# =============================================================================
set -e
CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8083}"
MAX_RETRIES=30
RETRY_INTERVAL=5
echo "=== Debezium Connector Registration ==="
echo "Connect URL: $CONNECT_URL"
# Wait for Debezium Connect to be ready
echo "Waiting for Debezium Connect to be ready..."
for i in $(seq 1 $MAX_RETRIES); do
if curl -s "$CONNECT_URL/" > /dev/null 2>&1; then
echo "Debezium Connect is ready!"
break
fi
if [ $i -eq $MAX_RETRIES ]; then
echo "ERROR: Debezium Connect is not ready after $MAX_RETRIES attempts"
exit 1
fi
echo "Attempt $i/$MAX_RETRIES - waiting ${RETRY_INTERVAL}s..."
sleep $RETRY_INTERVAL
done
# Check existing connectors
echo ""
echo "Checking existing connectors..."
EXISTING=$(curl -s "$CONNECT_URL/connectors")
echo "Existing connectors: $EXISTING"
# Register identity-postgres connector
echo ""
echo "Registering identity-postgres-connector..."
CONNECTOR_CONFIG='{
"name": "identity-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "'${POSTGRES_USER:-rwa_user}'",
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
"database.dbname": "rwa_identity",
"database.server.name": "identity",
"topic.prefix": "cdc.identity",
"table.include.list": "public.user_accounts",
"plugin.name": "pgoutput",
"publication.name": "debezium_identity_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_identity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}'
# Delete existing connector if exists
if echo "$EXISTING" | grep -q "identity-postgres-connector"; then
echo "Deleting existing identity-postgres-connector..."
curl -s -X DELETE "$CONNECT_URL/connectors/identity-postgres-connector"
sleep 2
fi
# Create connector
RESULT=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "$CONNECTOR_CONFIG" \
"$CONNECT_URL/connectors")
echo "Result: $RESULT"
# Check connector status
echo ""
echo "Checking connector status..."
sleep 3
STATUS=$(curl -s "$CONNECT_URL/connectors/identity-postgres-connector/status")
echo "Connector status: $STATUS"
echo ""
echo "=== Registration Complete ==="
echo ""
echo "Kafka topics created:"
echo " - cdc.identity.public.user_accounts"
echo ""
echo "To verify:"
echo " curl $CONNECT_URL/connectors"
echo " curl $CONNECT_URL/connectors/identity-postgres-connector/status"

View File

@ -17,3 +17,50 @@ for db in rwa_identity rwa_wallet rwa_mpc rwa_backup rwa_planting rwa_referral r
done
echo "All databases created successfully!"
# =============================================================================
# Configure PostgreSQL for Debezium CDC (Logical Replication)
# =============================================================================
echo "Configuring PostgreSQL for Debezium CDC..."
# Set wal_level to logical for CDC support
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" <<-EOSQL
-- Set wal_level to logical (requires restart to take effect)
ALTER SYSTEM SET wal_level = 'logical';
-- Increase max_replication_slots for Debezium
ALTER SYSTEM SET max_replication_slots = 4;
-- Increase max_wal_senders for replication connections
ALTER SYSTEM SET max_wal_senders = 4;
-- Create replication user for Debezium (if not exists)
DO \$\$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'debezium') THEN
CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD 'debezium_password';
END IF;
END
\$\$;
-- Grant connect to identity database
GRANT CONNECT ON DATABASE rwa_identity TO debezium;
EOSQL
# Grant schema permissions on rwa_identity
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "rwa_identity" <<-EOSQL
-- Grant usage on public schema
GRANT USAGE ON SCHEMA public TO debezium;
-- Grant select on all tables (current and future)
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
-- Create publication for CDC (user_accounts table only for now)
-- This will be created after the table exists (by Prisma migration)
-- DROP PUBLICATION IF EXISTS debezium_identity_publication;
-- CREATE PUBLICATION debezium_identity_publication FOR TABLE user_accounts;
EOSQL
echo "PostgreSQL CDC configuration completed!"
echo "NOTE: PostgreSQL restart is required for wal_level change to take effect."