使用实际的用户名和密码替代环境变量占位符, 因为 envsubst 不支持带默认值的变量语法 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> |
||
|---|---|---|
| .. | ||
| README.md | ||
| auth-outbox-connector.json | ||
| authorization-connector.json | ||
| contribution-outbox-connector.json | ||
| identity-connector.json | ||
| mining-outbox-connector.json | ||
| mining-wallet-outbox-connector.json | ||
| planting-connector.json | ||
| referral-connector.json | ||
| register-connectors.sh | ||
| trading-outbox-connector.json | ||
| wallet-connector.json | ||
README.md
Debezium CDC 配置指南
概述
本项目使用 Debezium 实现 Change Data Capture (CDC),将 identity-service 的用户数据实时同步到 admin-service。
┌─────────────────┐ WAL ┌──────────────┐ Kafka ┌─────────────────┐
│ identity-service│ ────────► │ Debezium │ ──────────► │ admin-service │
│ PostgreSQL │ │ Kafka Connect│ │ UserQueryView │
└─────────────────┘ └──────────────┘ └─────────────────┘
架构说明
组件
- PostgreSQL - 启用 logical replication (
wal_level=logical) - Debezium Connect - Kafka Connect with Debezium PostgreSQL connector
- Kafka - 消息传输
- CdcConsumerService - admin-service 中的 CDC 消费者
数据流
1. identity-service 写入 user_accounts 表
2. PostgreSQL 将变更写入 WAL
3. Debezium 读取 WAL,发布到 Kafka topic: cdc.identity.public.user_accounts
4. admin-service 的 CdcConsumerService 消费事件
5. 更新 admin-service 的 UserQueryView 表
部署步骤
1. 启动基础设施
cd backend/services
docker-compose up -d postgres kafka zookeeper debezium-connect
2. 等待 Debezium Connect 就绪
# 检查 Debezium Connect 状态
curl http://localhost:8083/
# 期望输出:
# {"version":"3.x.x","commit":"...","kafka_cluster_id":"..."}
3. 注册 Connector
# 方式1: 使用脚本
cd backend/services/scripts/debezium
chmod +x register-connectors.sh
./register-connectors.sh
# 方式2: 手动注册
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @identity-connector.json
4. 验证 Connector 状态
# 列出所有 connectors
curl http://localhost:8083/connectors
# 查看 connector 详情
curl http://localhost:8083/connectors/identity-postgres-connector/status
5. 启动 admin-service
docker-compose up -d admin-service
配置说明
PostgreSQL 配置
在 docker-compose.yml 中,PostgreSQL 启动参数:
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
Debezium Connector 配置
关键配置项 (identity-connector.json):
| 配置项 | 说明 |
|---|---|
database.hostname |
PostgreSQL 主机 |
database.dbname |
数据库名 rwa_identity |
table.include.list |
监听的表 public.user_accounts |
topic.prefix |
Kafka topic 前缀 cdc.identity |
plugin.name |
使用 pgoutput (PostgreSQL 原生) |
snapshot.mode |
initial 首次启动时全量快照 |
环境变量
admin-service:
| 变量 | 说明 | 默认值 |
|---|---|---|
KAFKA_CDC_CONSUMER_GROUP |
CDC 消费者组 | admin-service-cdc |
监控与运维
查看 Connector 状态
# 状态
curl http://localhost:8083/connectors/identity-postgres-connector/status
# 任务状态
curl http://localhost:8083/connectors/identity-postgres-connector/tasks/0/status
重启 Connector
curl -X POST http://localhost:8083/connectors/identity-postgres-connector/restart
查看 Kafka Topic
# 进入 Kafka 容器
docker exec -it rwa-kafka bash
# 列出 topics
kafka-topics --bootstrap-server localhost:9092 --list
# 消费 CDC topic
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic cdc.identity.public.user_accounts \
--from-beginning
查看消费者 Lag
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group admin-service-cdc \
--describe
故障排查
Connector 状态为 FAILED
# 查看错误信息
curl http://localhost:8083/connectors/identity-postgres-connector/status | jq
# 常见原因:
# 1. PostgreSQL 未启用 logical replication
# 2. 数据库连接失败
# 3. 权限不足
数据不同步
- 检查 Connector 状态
- 检查 Kafka consumer lag
- 查看 admin-service 日志
docker logs rwa-admin-service | grep CDC
重置 Connector Offset
⚠️ 危险操作,会导致数据重新同步
# 删除 connector
curl -X DELETE http://localhost:8083/connectors/identity-postgres-connector
# 删除 replication slot (在 PostgreSQL 中)
SELECT pg_drop_replication_slot('debezium_identity_slot');
# 重新注册 connector
./register-connectors.sh
与 Outbox 模式对比
| 特性 | Outbox (原方案) | Debezium CDC (新方案) |
|---|---|---|
| 代码侵入 | 需要在每个写操作添加事件 | 零侵入 |
| 可靠性 | 依赖代码正确发布 | 100% 捕获 WAL |
| 延迟 | 1秒轮询 | 近实时 (毫秒级) |
| Schema 变更 | 需手动同步 | 自动捕获 |
| 运维复杂度 | 简单 | 需要 Kafka Connect |
注意事项
- 首次启动: Debezium 会进行全量快照,可能需要较长时间
- Schema 变更: 添加字段会自动同步,删除字段需要手动处理
- 大表: 快照时可能影响数据库性能,建议在低峰期进行
- 数据一致性: CDC 是最终一致性,存在短暂延迟 (通常 < 200ms)