diff --git a/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts index a4a4659f..9d8cc817 100644 --- a/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts @@ -46,18 +46,28 @@ export class UserSyncedHandler { private async handleCreate(data: any, sequenceNum: bigint): Promise { if (!data) return; + // 兼容不同的字段命名(CDC 使用 snake_case) + const userId = data.user_id ?? data.id; + const accountSequence = data.account_sequence ?? data.accountSequence; + const phone = data.phone_number ?? data.phone ?? null; + const status = data.status ?? 'ACTIVE'; + + if (!userId || !accountSequence) { + this.logger.warn(`Invalid user data: missing user_id or account_sequence`, { data }); + return; + } + await this.unitOfWork.executeInTransaction(async () => { // 保存同步的用户数据 await this.syncedDataRepository.upsertSyncedUser({ - originalUserId: BigInt(data.id), - accountSequence: data.account_sequence || data.accountSequence, - phone: data.phone || null, - status: data.status || 'ACTIVE', + originalUserId: BigInt(userId), + accountSequence, + phone, + status, sourceSequenceNum: sequenceNum, }); // 为用户创建算力账户(如果不存在) - const accountSequence = data.account_sequence || data.accountSequence; const existingAccount = await this.contributionAccountRepository.findByAccountSequence(accountSequence); if (!existingAccount) { @@ -67,26 +77,38 @@ export class UserSyncedHandler { } }); - this.logger.debug(`User synced: ${data.account_sequence || data.accountSequence}`); + this.logger.debug(`User synced: ${accountSequence}`); } private async handleUpdate(data: any, sequenceNum: bigint): Promise { if (!data) return; + // 兼容不同的字段命名(CDC 使用 snake_case) + const userId = data.user_id ?? data.id; + const accountSequence = data.account_sequence ?? data.accountSequence; + const phone = data.phone_number ?? data.phone ?? null; + const status = data.status ?? 'ACTIVE'; + + if (!userId || !accountSequence) { + this.logger.warn(`Invalid user update data: missing user_id or account_sequence`, { data }); + return; + } + await this.syncedDataRepository.upsertSyncedUser({ - originalUserId: BigInt(data.id), - accountSequence: data.account_sequence || data.accountSequence, - phone: data.phone || null, - status: data.status || 'ACTIVE', + originalUserId: BigInt(userId), + accountSequence, + phone, + status, sourceSequenceNum: sequenceNum, }); - this.logger.debug(`User updated: ${data.account_sequence || data.accountSequence}`); + this.logger.debug(`User updated: ${accountSequence}`); } private async handleDelete(data: any): Promise { if (!data) return; + const accountSequence = data.account_sequence ?? data.accountSequence; // 用户删除一般不处理,保留历史数据 - this.logger.debug(`User delete event received: ${data.account_sequence || data.accountSequence}`); + this.logger.debug(`User delete event received: ${accountSequence}`); } } diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index c1ba8a66..e6d75868 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -102,8 +102,11 @@ declare -A SERVICE_PORTS=( ["mining-wallet-service"]="3025" ) -# CDC Consumer Group -CDC_CONSUMER_GROUP="contribution-service-cdc-group" +# CDC Consumer Groups (all groups that need to be reset during full-reset) +CDC_CONSUMER_GROUPS=( + "contribution-service-cdc-group" + "auth-service-cdc-group" +) # Colors RED='\033[0;31m' @@ -614,8 +617,11 @@ services_logs() { sync_reset() { print_section "Resetting CDC Consumer Offsets" - echo -e "${YELLOW}This will reset the CDC consumer to read from the beginning.${NC}" - echo "Consumer Group: $CDC_CONSUMER_GROUP" + echo -e "${YELLOW}This will reset CDC consumers to read from the beginning.${NC}" + echo "Consumer Groups:" + for group in "${CDC_CONSUMER_GROUPS[@]}"; do + echo " - $group" + done echo "" read -p "Continue? (y/n): " confirm @@ -624,65 +630,71 @@ sync_reset() { return 1 fi - # Stop contribution-service first - log_step "Stopping contribution-service" + # Stop services that use CDC + log_step "Stopping CDC consumer services" service_stop "contribution-service" + service_stop "auth-service" - # Reset offsets + # Reset offsets for all consumer groups log_step "Resetting consumer group offsets" - local reset_success=false + for group in "${CDC_CONSUMER_GROUPS[@]}"; do + log_info "Resetting: $group" + local reset_success=false - # Try local kafka-consumer-groups.sh first - if command -v kafka-consumer-groups.sh &>/dev/null; then - kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \ - --group "$CDC_CONSUMER_GROUP" \ - --reset-offsets \ - --to-earliest \ - --all-topics \ - --execute 2>/dev/null && reset_success=true - fi + # Try local kafka-consumer-groups.sh first + if command -v kafka-consumer-groups.sh &>/dev/null; then + kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \ + --group "$group" \ + --reset-offsets \ + --to-earliest \ + --all-topics \ + --execute 2>/dev/null && reset_success=true + fi - # Try docker exec if local failed - if [ "$reset_success" = false ] && docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then - log_info "Using Docker container: $KAFKA_CONTAINER" - docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ - --group "$CDC_CONSUMER_GROUP" \ - --reset-offsets \ - --to-earliest \ - --all-topics \ - --execute 2>&1 && reset_success=true - fi + # Try docker exec if local failed + if [ "$reset_success" = false ] && docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then + docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ + --group "$group" \ + --reset-offsets \ + --to-earliest \ + --all-topics \ + --execute 2>&1 && reset_success=true + fi - if [ "$reset_success" = true ]; then - log_success "CDC consumer offsets reset to beginning" - else - log_warn "Could not reset offsets automatically" - fi + if [ "$reset_success" = true ]; then + log_success "Offsets reset for $group" + else + log_warn "Could not reset offsets for $group" + fi + done - log_info "Start contribution-service to begin syncing from the beginning" + log_info "Start services to begin syncing from the beginning" } sync_status() { print_section "CDC Sync Status" - echo -e "${BOLD}Consumer Group:${NC} $CDC_CONSUMER_GROUP" - echo "" + for group in "${CDC_CONSUMER_GROUPS[@]}"; do + echo -e "${BOLD}Consumer Group:${NC} $group" + echo "" - # Try local first, then docker - if command -v kafka-consumer-groups.sh &>/dev/null; then - kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \ - --group "$CDC_CONSUMER_GROUP" \ - --describe 2>/dev/null && return 0 - fi + # Try local first, then docker + if command -v kafka-consumer-groups.sh &>/dev/null; then + kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \ + --group "$group" \ + --describe 2>/dev/null && echo "" && continue + fi - if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then - docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ - --group "$CDC_CONSUMER_GROUP" \ - --describe 2>&1 || log_warn "Could not get consumer group status" - else - log_warn "Kafka container '$KAFKA_CONTAINER' not found" - fi + if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then + docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ + --group "$group" \ + --describe 2>&1 || log_warn "Could not get status for $group" + else + log_warn "Kafka container '$KAFKA_CONTAINER' not found" + fi + echo "" + done } # =========================================================================== @@ -727,28 +739,32 @@ full_reset() { db_migrate log_step "Step 5/6: Resetting CDC consumer offsets..." - # Try local kafka-consumer-groups.sh first, then docker exec - if command -v kafka-consumer-groups.sh &>/dev/null; then - kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \ - --group "$CDC_CONSUMER_GROUP" \ - --reset-offsets \ - --to-earliest \ - --all-topics \ - --execute 2>/dev/null && log_success "CDC offsets reset" || log_warn "Local kafka-consumer-groups.sh failed" - fi + # Reset all CDC consumer groups + for group in "${CDC_CONSUMER_GROUPS[@]}"; do + log_info "Resetting consumer group: $group" - # Try docker exec with the correct container name - if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then - log_info "Resetting offsets via Docker container: $KAFKA_CONTAINER" - docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ - --group "$CDC_CONSUMER_GROUP" \ - --reset-offsets \ - --to-earliest \ - --all-topics \ - --execute 2>&1 || log_warn "Could not reset offsets via Docker" - else - log_warn "Kafka container '$KAFKA_CONTAINER' not found. Manual offset reset may be needed." - fi + # Try local kafka-consumer-groups.sh first + if command -v kafka-consumer-groups.sh &>/dev/null; then + kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \ + --group "$group" \ + --reset-offsets \ + --to-earliest \ + --all-topics \ + --execute 2>/dev/null && log_success "CDC offsets reset for $group" && continue + fi + + # Try docker exec with the correct container name + if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then + docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ + --group "$group" \ + --reset-offsets \ + --to-earliest \ + --all-topics \ + --execute 2>&1 && log_success "CDC offsets reset for $group" || log_warn "Could not reset offsets for $group" + else + log_warn "Kafka container '$KAFKA_CONTAINER' not found. Manual offset reset may be needed." + fi + done log_step "Step 6/6: Starting 2.0 services..." for service in "${MINING_SERVICES[@]}"; do