feat(admin-service): 添加 referral-service CDC 数据同步

- 新增 ReferralQueryView schema 和 migration
- 新增 ReferralCdcConsumerService 消费推荐关系变更
- 配置 referral-postgres-connector 用于 Debezium CDC
- 更新 deploy.sh 自动注册 referral connector
- 更新 init-databases.sh 配置 rwa_referral 逻辑复制权限

CDC 同步的字段:
- user_id, account_sequence, referrer_id
- my_referral_code, used_referral_code
- ancestor_path, depth
- direct_referral_count, active_direct_count

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-07 18:57:58 -08:00
parent 6b55b69d0d
commit c4cec836d9
8 changed files with 525 additions and 61 deletions

View File

@ -0,0 +1,32 @@
-- CreateTable: referral_query_view
-- 推荐关系查询视图 - 通过 Debezium CDC 从 referral-service 同步
CREATE TABLE "referral_query_view" (
"relationship_id" BIGINT NOT NULL,
"user_id" BIGINT NOT NULL,
"account_sequence" VARCHAR(12) NOT NULL,
"referrer_id" BIGINT,
"root_user_id" BIGINT,
"my_referral_code" VARCHAR(20) NOT NULL,
"used_referral_code" VARCHAR(20),
"ancestor_path" BIGINT[] NOT NULL DEFAULT '{}',
"depth" INTEGER NOT NULL DEFAULT 0,
"direct_referral_count" INTEGER NOT NULL DEFAULT 0,
"active_direct_count" INTEGER NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL,
"synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "referral_query_view_pkey" PRIMARY KEY ("relationship_id")
);
-- CreateIndex: unique constraints
CREATE UNIQUE INDEX "referral_query_view_user_id_key" ON "referral_query_view"("user_id");
CREATE UNIQUE INDEX "referral_query_view_account_sequence_key" ON "referral_query_view"("account_sequence");
CREATE UNIQUE INDEX "referral_query_view_my_referral_code_key" ON "referral_query_view"("my_referral_code");
-- CreateIndex: query indexes
CREATE INDEX "referral_query_view_referrer_id_idx" ON "referral_query_view"("referrer_id");
CREATE INDEX "referral_query_view_account_sequence_idx" ON "referral_query_view"("account_sequence");
CREATE INDEX "referral_query_view_my_referral_code_idx" ON "referral_query_view"("my_referral_code");
CREATE INDEX "referral_query_view_used_referral_code_idx" ON "referral_query_view"("used_referral_code");
CREATE INDEX "referral_query_view_root_user_id_idx" ON "referral_query_view"("root_user_id");
CREATE INDEX "referral_query_view_depth_idx" ON "referral_query_view"("depth");

View File

@ -469,6 +469,46 @@ model UserQueryView {
@@map("user_query_view")
}
// =============================================================================
// Referral Query View (推荐关系查询视图 - 通过 CDC 同步)
// =============================================================================
/// 推荐关系查询视图 - 通过 Debezium CDC 从 referral-service 同步
/// 用于 admin-web 查看用户推荐关系,避免跨服务调用
model ReferralQueryView {
id BigInt @id @map("relationship_id")
userId BigInt @unique @map("user_id")
accountSequence String @unique @map("account_sequence") @db.VarChar(12)
// 推荐人信息
referrerId BigInt? @map("referrer_id")
rootUserId BigInt? @map("root_user_id")
// 推荐码
myReferralCode String @unique @map("my_referral_code") @db.VarChar(20)
usedReferralCode String? @map("used_referral_code") @db.VarChar(20)
// 推荐链信息
ancestorPath BigInt[] @map("ancestor_path")
depth Int @default(0) @map("depth")
// 直推统计
directReferralCount Int @default(0) @map("direct_referral_count")
activeDirectCount Int @default(0) @map("active_direct_count")
// 时间戳
createdAt DateTime @map("created_at")
syncedAt DateTime @default(now()) @map("synced_at")
@@index([referrerId])
@@index([accountSequence])
@@index([myReferralCode])
@@index([usedReferralCode])
@@index([rootUserId])
@@index([depth])
@@map("referral_query_view")
}
// =============================================================================
// Kafka Event Tracking (事件消费追踪)
// =============================================================================

View File

@ -1,3 +1,4 @@
export * from './kafka.module';
export * from './user-event-consumer.service';
export * from './cdc-consumer.service';
export * from './referral-cdc-consumer.service';

View File

@ -5,17 +5,17 @@ import { UserQueryRepositoryImpl } from '../persistence/repositories/user-query.
import { USER_QUERY_REPOSITORY } from '../../domain/repositories/user-query.repository';
import { UserEventConsumerService } from './user-event-consumer.service';
import { CdcConsumerService } from './cdc-consumer.service';
import { ReferralCdcConsumerService } from './referral-cdc-consumer.service';
/**
* Kafka
*
*
* 1. UserEventConsumerService - Outbox ()
* 2. CdcConsumerService - Debezium CDC ()
* CDC - Debezium
* 1. CdcConsumerService - identity-service
* 2. ReferralCdcConsumerService - referral-service
*
* CDC_ENABLED CDC
* - CDC_ENABLED=true: 使 CDC ()
* - CDC_ENABLED=false: 使 Outbox
* Outbox -
* 1. UserEventConsumerService -
*/
@Module({
imports: [ConfigModule],
@ -25,11 +25,18 @@ import { CdcConsumerService } from './cdc-consumer.service';
provide: USER_QUERY_REPOSITORY,
useClass: UserQueryRepositoryImpl,
},
// 传统 Outbox 事件消费者 (保留用于领域事件)
// Outbox 事件消费者 (业务领域事件)
UserEventConsumerService,
// CDC 消费者 (用于数据同步)
// CDC 消费者 - identity-service
CdcConsumerService,
// CDC 消费者 - referral-service
ReferralCdcConsumerService,
],
exports: [
UserEventConsumerService,
CdcConsumerService,
ReferralCdcConsumerService,
USER_QUERY_REPOSITORY,
],
exports: [UserEventConsumerService, CdcConsumerService, USER_QUERY_REPOSITORY],
})
export class KafkaModule {}

View File

@ -0,0 +1,289 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
import { PrismaService } from '../persistence/prisma/prisma.service';
/**
* Debezium CDC ( ExtractNewRecordState )
* referral-service referral_relationships
*/
interface CdcReferralPayload {
// 推荐关系表字段 (snake_case from PostgreSQL)
relationship_id: string;
user_id: string;
account_sequence: string;
referrer_id?: string | null;
root_user_id?: string | null;
my_referral_code: string;
used_referral_code?: string | null;
ancestor_path?: string[] | null; // PostgreSQL bigint[] 作为字符串数组
depth: number;
direct_referral_count: number;
active_direct_count: number;
created_at: string;
updated_at: string;
// Debezium 元数据 (由 ExtractNewRecordState 添加)
__op: 'c' | 'u' | 'd' | 'r'; // c=create, u=update, d=delete, r=read(snapshot)
__table: string;
__source_ts_ms: number;
__deleted?: string; // 'true' for delete events when using rewrite mode
}
/**
* Referral CDC
*
* Debezium referral-service PostgreSQL
*
* Topic: cdc.referral.public.referral_relationships
*/
@Injectable()
export class ReferralCdcConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(ReferralCdcConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isRunning = false;
// CDC Topic
private readonly cdcTopic = 'cdc.referral.public.referral_relationships';
private readonly consumerGroup: string;
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
) {
const brokers = (this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092')).split(',');
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID', 'admin-service');
this.consumerGroup = this.configService.get<string>('KAFKA_REFERRAL_CDC_GROUP', 'admin-service-referral-cdc');
this.kafka = new Kafka({
clientId: `${clientId}-referral-cdc`,
brokers,
logLevel: logLevel.WARN,
});
this.consumer = this.kafka.consumer({ groupId: this.consumerGroup });
this.logger.log(`[Referral-CDC] Configured to consume topic: ${this.cdcTopic}`);
}
async onModuleInit() {
await this.start();
}
async onModuleDestroy() {
await this.stop();
}
async start(): Promise<void> {
if (this.isRunning) {
this.logger.warn('[Referral-CDC] Consumer already running');
return;
}
try {
this.logger.log('[Referral-CDC] Connecting to Kafka...');
await this.consumer.connect();
await this.consumer.subscribe({
topic: this.cdcTopic,
fromBeginning: false,
});
this.logger.log(`[Referral-CDC] Subscribed to topic: ${this.cdcTopic}`);
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
this.isRunning = true;
this.logger.log('[Referral-CDC] Consumer started successfully');
} catch (error) {
this.logger.error('[Referral-CDC] Failed to start consumer:', error);
}
}
async stop(): Promise<void> {
if (!this.isRunning) return;
try {
await this.consumer.disconnect();
this.isRunning = false;
this.logger.log('[Referral-CDC] Consumer stopped');
} catch (error) {
this.logger.error('[Referral-CDC] Failed to stop consumer:', error);
}
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { topic, partition, message } = payload;
if (!message.value) {
this.logger.warn(`[Referral-CDC] Empty message from ${topic}:${partition}`);
return;
}
try {
const data = JSON.parse(message.value.toString()) as CdcReferralPayload;
const operation = data.__op;
const sourceTs = data.__source_ts_ms;
this.logger.debug(
`[Referral-CDC] Received ${operation} event for user ${data.account_sequence} ` +
`(ts: ${new Date(sourceTs).toISOString()})`
);
// 幂等性检查
const eventId = `referral-cdc:${topic}:${partition}:${message.offset}`;
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`[Referral-CDC] Event ${eventId} already processed, skipping`);
return;
}
// 处理 CDC 事件
await this.processCdcEvent(data);
// 记录已处理
await this.markEventProcessed(eventId, `referral-cdc:${operation}`);
this.logger.log(
`[Referral-CDC] ✓ Processed ${operation} for user: ${data.account_sequence}`
);
} catch (error) {
this.logger.error(`[Referral-CDC] Failed to process message:`, error);
throw error; // 让 KafkaJS 重试
}
}
private async processCdcEvent(data: CdcReferralPayload): Promise<void> {
const operation = data.__op;
const isDeleted = data.__deleted === 'true';
if (operation === 'd' || isDeleted) {
await this.handleDelete(data);
} else if (operation === 'c' || operation === 'r') {
await this.handleCreateOrSnapshot(data);
} else if (operation === 'u') {
await this.handleUpdate(data);
}
}
private async handleCreateOrSnapshot(data: CdcReferralPayload): Promise<void> {
const ancestorPath = this.parseAncestorPath(data.ancestor_path);
await this.prisma.referralQueryView.upsert({
where: { id: BigInt(data.relationship_id) },
create: {
id: BigInt(data.relationship_id),
userId: BigInt(data.user_id),
accountSequence: data.account_sequence,
referrerId: data.referrer_id ? BigInt(data.referrer_id) : null,
rootUserId: data.root_user_id ? BigInt(data.root_user_id) : null,
myReferralCode: data.my_referral_code,
usedReferralCode: data.used_referral_code || null,
ancestorPath,
depth: data.depth,
directReferralCount: data.direct_referral_count,
activeDirectCount: data.active_direct_count,
createdAt: new Date(data.created_at),
syncedAt: new Date(),
},
update: {
userId: BigInt(data.user_id),
accountSequence: data.account_sequence,
referrerId: data.referrer_id ? BigInt(data.referrer_id) : null,
rootUserId: data.root_user_id ? BigInt(data.root_user_id) : null,
myReferralCode: data.my_referral_code,
usedReferralCode: data.used_referral_code || null,
ancestorPath,
depth: data.depth,
directReferralCount: data.direct_referral_count,
activeDirectCount: data.active_direct_count,
syncedAt: new Date(),
},
});
this.logger.log(`[Referral-CDC] Created/Snapshot referral: ${data.account_sequence}`);
}
private async handleUpdate(data: CdcReferralPayload): Promise<void> {
const id = BigInt(data.relationship_id);
const exists = await this.prisma.referralQueryView.findUnique({
where: { id },
select: { id: true },
});
if (!exists) {
await this.handleCreateOrSnapshot(data);
return;
}
const ancestorPath = this.parseAncestorPath(data.ancestor_path);
await this.prisma.referralQueryView.update({
where: { id },
data: {
referrerId: data.referrer_id ? BigInt(data.referrer_id) : null,
rootUserId: data.root_user_id ? BigInt(data.root_user_id) : null,
usedReferralCode: data.used_referral_code || null,
ancestorPath,
depth: data.depth,
directReferralCount: data.direct_referral_count,
activeDirectCount: data.active_direct_count,
syncedAt: new Date(),
},
});
this.logger.log(`[Referral-CDC] Updated referral: ${data.account_sequence}`);
}
private async handleDelete(data: CdcReferralPayload): Promise<void> {
const id = BigInt(data.relationship_id);
try {
await this.prisma.referralQueryView.delete({
where: { id },
});
this.logger.log(`[Referral-CDC] Deleted referral: ${data.account_sequence}`);
} catch {
this.logger.warn(`[Referral-CDC] Referral not found for delete: ${data.account_sequence}`);
}
}
// ==================== Helper Methods ====================
private parseAncestorPath(path: string[] | null | undefined): bigint[] {
if (!path || !Array.isArray(path)) return [];
return path.map(id => BigInt(id));
}
private async isEventProcessed(eventId: string): Promise<boolean> {
const count = await this.prisma.processedEvent.count({
where: { eventId },
});
return count > 0;
}
private async markEventProcessed(eventId: string, eventType: string): Promise<void> {
await this.prisma.processedEvent.create({
data: {
eventId,
eventType,
processedAt: new Date(),
},
});
}
/**
*
*/
getStatus(): { isRunning: boolean; topic: string; consumerGroup: string } {
return {
isRunning: this.isRunning,
topic: this.cdcTopic,
consumerGroup: this.consumerGroup,
};
}
}

View File

@ -230,60 +230,108 @@ up() {
register_debezium_connectors() {
log_info "Registering Debezium connectors..."
# Check if connector already exists
# Check existing connectors
EXISTING=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]")
if echo "$EXISTING" | grep -q "identity-postgres-connector"; then
log_info "identity-postgres-connector already registered"
return
fi
# Read database credentials from .env
source "$ENV_FILE"
# Register identity-postgres-connector
CONNECTOR_CONFIG='{
"name": "identity-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "'${POSTGRES_USER:-rwa_user}'",
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
"database.dbname": "rwa_identity",
"topic.prefix": "cdc.identity",
"table.include.list": "public.user_accounts",
"plugin.name": "pgoutput",
"publication.name": "debezium_identity_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_identity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}'
RESULT=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "$CONNECTOR_CONFIG" \
"http://localhost:8083/connectors" 2>/dev/null || echo "failed")
if echo "$RESULT" | grep -q "identity-postgres-connector"; then
log_info "identity-postgres-connector registered successfully"
if echo "$EXISTING" | grep -q "identity-postgres-connector"; then
log_info "identity-postgres-connector already registered"
else
log_warn "Failed to register connector: $RESULT"
log_warn "You may need to register it manually: ./deploy.sh debezium-register"
log_info "Registering identity-postgres-connector..."
IDENTITY_CONFIG='{
"name": "identity-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "'${POSTGRES_USER:-rwa_user}'",
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
"database.dbname": "rwa_identity",
"topic.prefix": "cdc.identity",
"table.include.list": "public.user_accounts",
"plugin.name": "pgoutput",
"publication.name": "debezium_identity_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_identity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}'
RESULT=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "$IDENTITY_CONFIG" \
"http://localhost:8083/connectors" 2>/dev/null || echo "failed")
if echo "$RESULT" | grep -q "identity-postgres-connector"; then
log_info "identity-postgres-connector registered successfully"
else
log_warn "Failed to register identity connector: $RESULT"
fi
fi
# Register referral-postgres-connector
if echo "$EXISTING" | grep -q "referral-postgres-connector"; then
log_info "referral-postgres-connector already registered"
else
log_info "Registering referral-postgres-connector..."
REFERRAL_CONFIG='{
"name": "referral-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "'${POSTGRES_USER:-rwa_user}'",
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
"database.dbname": "rwa_referral",
"topic.prefix": "cdc.referral",
"table.include.list": "public.referral_relationships",
"plugin.name": "pgoutput",
"publication.name": "debezium_referral_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_referral_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}'
RESULT=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "$REFERRAL_CONFIG" \
"http://localhost:8083/connectors" 2>/dev/null || echo "failed")
if echo "$RESULT" | grep -q "referral-postgres-connector"; then
log_info "referral-postgres-connector registered successfully"
else
log_warn "Failed to register referral connector: $RESULT"
fi
fi
}

View File

@ -0,0 +1,41 @@
{
"name": "referral-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "${POSTGRES_USER:-rwa_user}",
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
"database.dbname": "rwa_referral",
"topic.prefix": "cdc.referral",
"table.include.list": "public.referral_relationships",
"plugin.name": "pgoutput",
"publication.name": "debezium_referral_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_referral_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}

View File

@ -43,8 +43,9 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" <<-EOSQL
END
\$\$;
-- Grant connect to identity database
-- Grant connect to identity and referral databases
GRANT CONNECT ON DATABASE rwa_identity TO debezium;
GRANT CONNECT ON DATABASE rwa_referral TO debezium;
EOSQL
# Grant schema permissions on rwa_identity
@ -55,11 +56,16 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "rwa_identity" <<-E
-- Grant select on all tables (current and future)
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
EOSQL
-- Create publication for CDC (user_accounts table only for now)
-- This will be created after the table exists (by Prisma migration)
-- DROP PUBLICATION IF EXISTS debezium_identity_publication;
-- CREATE PUBLICATION debezium_identity_publication FOR TABLE user_accounts;
# Grant schema permissions on rwa_referral
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "rwa_referral" <<-EOSQL
-- Grant usage on public schema
GRANT USAGE ON SCHEMA public TO debezium;
-- Grant select on all tables (current and future)
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
EOSQL
echo "PostgreSQL CDC configuration completed!"