#!/bin/bash # # RWA Mining Ecosystem 2.0 - Deployment & Management Script # ========================================================== # # This script manages the Mining 2.0 ecosystem independently from the 1.0 system. # The 2.0 system is completely isolated and can be reset at any time without # affecting the 1.0 system. # # Usage: # ./deploy-mining.sh up [service] # Start all or specific service # ./deploy-mining.sh down [service] # Stop all or specific service # ./deploy-mining.sh restart [service] # Restart all or specific service # ./deploy-mining.sh status # Show 2.0 service status # ./deploy-mining.sh logs # View logs for specific service # ./deploy-mining.sh build [service] [--no-cache] # Build all or specific service # ./deploy-mining.sh rebuild [service] # Rebuild with --no-cache # # Database Management: # ./deploy-mining.sh db-create # Create 2.0 databases # ./deploy-mining.sh db-migrate # Run Prisma migrations # ./deploy-mining.sh db-reset # Drop and recreate 2.0 databases (DANGEROUS!) # ./deploy-mining.sh db-status # Show database status # # CDC & Sync: # ./deploy-mining.sh sync-reset # Reset CDC consumer offsets to beginning # ./deploy-mining.sh sync-status # Show CDC consumer group status # # Full Reset (for development/testing): # ./deploy-mining.sh full-reset # Complete reset: stop services, drop DBs, recreate, resync # # Health & Monitoring: # ./deploy-mining.sh health # Check health of all 2.0 services # ./deploy-mining.sh stats # Show system statistics # # Service Aliases: # contrib, contribution -> contribution-service # mining -> mining-service # trading -> trading-service # admin -> mining-admin-service # auth -> auth-service # wallet -> mining-wallet-service # set -e # =========================================================================== # Configuration # =========================================================================== SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" ENV_FILE="$SCRIPT_DIR/.env" COMPOSE_FILE="$SCRIPT_DIR/docker-compose.2.0.yml" # 2.0 Services MINING_SERVICES=( "contribution-service" "mining-service" "trading-service" "mining-admin-service" "auth-service" "mining-wallet-service" ) # Service Aliases declare -A SERVICE_ALIASES=( ["contrib"]="contribution-service" ["contribution"]="contribution-service" ["mining"]="mining-service" ["trading"]="trading-service" ["admin"]="mining-admin-service" ["auth"]="auth-service" ["wallet"]="mining-wallet-service" ) # 2.0 Databases MINING_DATABASES=( "rwa_contribution" "rwa_mining" "rwa_trading" "rwa_mining_admin" "rwa_auth" "rwa_mining_wallet" ) # Service to Database mapping declare -A SERVICE_DB=( ["contribution-service"]="rwa_contribution" ["mining-service"]="rwa_mining" ["trading-service"]="rwa_trading" ["mining-admin-service"]="rwa_mining_admin" ["auth-service"]="rwa_auth" ["mining-wallet-service"]="rwa_mining_wallet" ) # 2.0 Ports declare -A SERVICE_PORTS=( ["contribution-service"]="3020" ["mining-service"]="3021" ["trading-service"]="3022" ["mining-admin-service"]="3023" ["auth-service"]="3024" ["mining-wallet-service"]="3025" ) # CDC Consumer Groups (all groups that need to be reset during full-reset) CDC_CONSUMER_GROUPS=( "contribution-service-cdc-group" "auth-service-cdc-group" "mining-admin-service-cdc-group" ) # Debezium Outbox Connectors (for 2.0 service events -> mining-admin-service) # These connectors capture events from each service's outbox table OUTBOX_CONNECTORS=( "auth-outbox-connector" "contribution-outbox-connector" "mining-outbox-connector" "trading-outbox-connector" "mining-wallet-outbox-connector" ) # Debezium Connect URL (default port 8084 as mapped in docker-compose) DEBEZIUM_CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8084}" # Colors RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' BLUE='\033[0;34m' CYAN='\033[0;36m' MAGENTA='\033[0;35m' NC='\033[0m' BOLD='\033[1m' # =========================================================================== # Logging Functions # =========================================================================== log_info() { echo -e "${GREEN}[INFO]${NC} $1"; } log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; } log_error() { echo -e "${RED}[ERROR]${NC} $1"; } log_step() { echo -e "${BLUE}[STEP]${NC} $1"; } log_success() { echo -e "${GREEN}[SUCCESS]${NC} $1"; } print_header() { echo "" echo -e "${CYAN}╔════════════════════════════════════════════════════════════╗${NC}" echo -e "${CYAN}║${NC} ${BOLD}RWA Mining Ecosystem 2.0 - Management Script${NC} ${CYAN}║${NC}" echo -e "${CYAN}╚════════════════════════════════════════════════════════════╝${NC}" echo "" } print_section() { echo "" echo -e "${MAGENTA}━━━ $1 ━━━${NC}" echo "" } # =========================================================================== # Helper Functions # =========================================================================== resolve_service_name() { local input="$1" # Check if it's an alias if [ -n "${SERVICE_ALIASES[$input]:-}" ]; then echo "${SERVICE_ALIASES[$input]}" return 0 fi # Check if it's a valid service name for service in "${MINING_SERVICES[@]}"; do if [ "$service" = "$input" ]; then echo "$service" return 0 fi done # Not found return 1 } validate_service() { local service="$1" for s in "${MINING_SERVICES[@]}"; do if [ "$s" = "$service" ]; then return 0 fi done return 1 } get_services_to_process() { local input="$1" if [ -z "$input" ]; then # Return all services echo "${MINING_SERVICES[@]}" else # Resolve and return single service local resolved resolved=$(resolve_service_name "$input") || { log_error "Unknown service: $input" echo "" echo "Available services:" for service in "${MINING_SERVICES[@]}"; do echo " - $service (port ${SERVICE_PORTS[$service]})" done echo "" echo "Aliases:" echo " - contrib, contribution -> contribution-service" echo " - mining -> mining-service" echo " - trading -> trading-service" echo " - admin -> mining-admin-service" exit 1 } echo "$resolved" fi } # =========================================================================== # Environment Loading # =========================================================================== load_env() { if [ -f "$ENV_FILE" ]; then set -a source "$ENV_FILE" set +a else log_warn "No .env file found, using defaults" fi # Set defaults (match docker-compose.yml settings) POSTGRES_HOST="${POSTGRES_HOST:-localhost}" POSTGRES_PORT="${POSTGRES_PORT:-5432}" POSTGRES_USER="${POSTGRES_USER:-rwa_user}" POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-rwa_secure_password}" KAFKA_BROKERS="${KAFKA_BROKERS:-localhost:9092}" REDIS_HOST="${REDIS_HOST:-localhost}" REDIS_PORT="${REDIS_PORT:-6379}" # Docker container names (match docker-compose.yml container_name) POSTGRES_CONTAINER="${POSTGRES_CONTAINER:-rwa-postgres}" KAFKA_CONTAINER="${KAFKA_CONTAINER:-rwa-kafka}" } # =========================================================================== # Helper: Execute psql command (auto-detect Docker or local) # =========================================================================== run_psql() { local db="$1" local sql="$2" # Try docker exec first (for production Docker environment) if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${POSTGRES_CONTAINER}$"; then docker exec -e PGPASSWORD="$POSTGRES_PASSWORD" "$POSTGRES_CONTAINER" \ psql -h localhost -U "$POSTGRES_USER" -d "$db" -c "$sql" 2>/dev/null return $? fi # Fall back to local psql PGPASSWORD="$POSTGRES_PASSWORD" psql -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" -U "$POSTGRES_USER" -d "$db" -c "$sql" 2>/dev/null return $? } # =========================================================================== # Database Functions # =========================================================================== db_create() { local target_service="$1" print_section "Creating 2.0 Databases" local dbs_to_create=() if [ -n "$target_service" ]; then dbs_to_create=("${SERVICE_DB[$target_service]}") else dbs_to_create=("${MINING_DATABASES[@]}") fi for db in "${dbs_to_create[@]}"; do log_step "Creating database: $db" if run_psql "postgres" "CREATE DATABASE $db;"; then log_success "Database $db created" else log_warn "Database $db already exists or creation failed" fi done log_success "Database creation completed" } db_drop() { local target_service="$1" print_section "Dropping 2.0 Databases" local dbs_to_drop=() if [ -n "$target_service" ]; then dbs_to_drop=("${SERVICE_DB[$target_service]}") else dbs_to_drop=("${MINING_DATABASES[@]}") fi for db in "${dbs_to_drop[@]}"; do log_step "Dropping database: $db" if run_psql "postgres" "DROP DATABASE IF EXISTS $db WITH (FORCE);"; then log_success "Database $db dropped" else log_warn "Failed to drop database $db" fi done log_success "Database drop completed" } db_migrate() { local target_service="$1" print_section "Running Prisma Migrations" local services_to_migrate=() if [ -n "$target_service" ]; then services_to_migrate=("$target_service") else services_to_migrate=("${MINING_SERVICES[@]}") fi for service in "${services_to_migrate[@]}"; do service_dir="$SCRIPT_DIR/$service" if [ -d "$service_dir/prisma" ]; then log_step "Migrating: $service" # Check if running in Docker environment (container exists) local container_name="rwa-${service}" if docker ps -a --format '{{.Names}}' 2>/dev/null | grep -q "^${container_name}$"; then # Run migration inside the container log_info "Running migration in container: $container_name" docker start "$container_name" 2>/dev/null || true sleep 2 docker exec "$container_name" npx prisma migrate deploy 2>/dev/null || \ docker exec "$container_name" npx prisma db push --accept-data-loss 2>/dev/null || { log_warn "Container migration failed, trying to build and run temporary container..." # Build and run a temporary container for migration docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" run --rm "$service" npx prisma migrate deploy 2>/dev/null || \ docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" run --rm "$service" npx prisma db push --accept-data-loss 2>/dev/null || { log_warn "Migration failed for $service" } } elif command -v npx &>/dev/null; then # Local development: use npx directly cd "$service_dir" npx prisma migrate deploy 2>/dev/null || npx prisma db push --accept-data-loss cd "$SCRIPT_DIR" else # No npx and no container - try docker compose run log_info "No local npx, using docker compose run for migration" docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" run --rm "$service" npx prisma migrate deploy 2>/dev/null || \ docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" run --rm "$service" npx prisma db push --accept-data-loss 2>/dev/null || { log_warn "Migration failed for $service" } fi fi done log_success "Migrations completed" } db_status() { print_section "2.0 Database Status" echo -e "${BOLD}Database${NC}\t\t${BOLD}Status${NC}\t\t${BOLD}Tables${NC}" echo "────────────────────────────────────────────────────" for db in "${MINING_DATABASES[@]}"; do if run_psql "$db" "SELECT 1" &>/dev/null; then # Get table count using run_psql helper local table_count if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${POSTGRES_CONTAINER}$"; then table_count=$(docker exec -e PGPASSWORD="$POSTGRES_PASSWORD" "$POSTGRES_CONTAINER" \ psql -h localhost -U "$POSTGRES_USER" -d "$db" -t -c "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public';" 2>/dev/null | tr -d ' ') else table_count=$(PGPASSWORD="$POSTGRES_PASSWORD" psql -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" -U "$POSTGRES_USER" -d "$db" -t -c "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public';" 2>/dev/null | tr -d ' ') fi echo -e "${GREEN}$db${NC}\t${GREEN}UP${NC}\t\t$table_count tables" else echo -e "${RED}$db${NC}\t\t${RED}DOWN${NC}\t\t-" fi done } db_reset() { local target_service="$1" print_section "Resetting 2.0 Databases" local dbs_to_reset=() if [ -n "$target_service" ]; then dbs_to_reset=("${SERVICE_DB[$target_service]}") echo -e "${RED}${BOLD}WARNING: This will DELETE data for $target_service!${NC}" else dbs_to_reset=("${MINING_DATABASES[@]}") echo -e "${RED}${BOLD}WARNING: This will DELETE ALL 2.0 DATA!${NC}" fi echo "Affected databases:" for db in "${dbs_to_reset[@]}"; do echo " - $db" done echo "" read -p "Are you sure? Type 'yes' to confirm: " confirm if [ "$confirm" != "yes" ]; then log_warn "Aborted" return 1 fi db_drop "$target_service" db_create "$target_service" db_migrate "$target_service" log_success "Database reset completed" } # =========================================================================== # Single Service Functions # =========================================================================== service_start() { local service="$1" local port="${SERVICE_PORTS[$service]}" log_step "Starting: $service (port $port)" docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d "$service" log_success "$service started" } service_stop() { local service="$1" log_step "Stopping: $service" docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" log_success "$service stopped" } service_restart() { local service="$1" log_step "Restarting: $service" docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" restart "$service" log_success "$service restarted" } service_build() { local service="$1" local no_cache="$2" log_step "Building: $service" # Use docker compose to build if [ "$no_cache" = "--no-cache" ] || [ "$no_cache" = "true" ]; then log_info "Building Docker image (no cache)..." docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" build --no-cache "$service" else log_info "Building Docker image..." docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" build "$service" fi log_success "$service built successfully" } service_rebuild() { local service="$1" local no_cache="$2" # 1. Build the service service_build "$service" "$no_cache" # 2. Stop the old service log_info "Stopping old $service..." service_stop "$service" # 3. Start the new service log_info "Starting new $service..." service_start "$service" log_success "$service rebuilt and restarted successfully" } # =========================================================================== # Batch Service Functions # =========================================================================== services_up() { local target="$1" print_section "Starting 2.0 Services" local services services=$(get_services_to_process "$target") [ -z "$services" ] && exit 1 for service in $services; do service_start "$service" done log_success "Service startup completed" } services_down() { local target="$1" print_section "Stopping 2.0 Services" local services services=$(get_services_to_process "$target") [ -z "$services" ] && exit 1 for service in $services; do service_stop "$service" done log_success "Services stopped" } services_restart() { local target="$1" print_section "Restarting 2.0 Services" local services services=$(get_services_to_process "$target") [ -z "$services" ] && exit 1 for service in $services; do service_restart "$service" done log_success "Services restarted" } services_build() { local target="$1" local no_cache="$2" print_section "Building 2.0 Services" local services services=$(get_services_to_process "$target") [ -z "$services" ] && exit 1 for service in $services; do service_build "$service" "$no_cache" done log_success "Build completed" } services_rebuild() { local target="$1" print_section "Rebuilding 2.0 Services" local services services=$(get_services_to_process "$target") [ -z "$services" ] && exit 1 for service in $services; do service_rebuild "$service" "--no-cache" done log_success "Rebuild completed" } services_status() { print_section "2.0 Service Status" echo -e "${BOLD}Service${NC}\t\t\t${BOLD}Port${NC}\t${BOLD}Status${NC}\t\t${BOLD}Health${NC}\t\t${BOLD}PID${NC}" echo "────────────────────────────────────────────────────────────────────────" for service in "${MINING_SERVICES[@]}"; do port="${SERVICE_PORTS[$service]}" pid="-" # Get PID if [ -f "/tmp/$service.pid" ]; then pid=$(cat /tmp/$service.pid 2>/dev/null || echo "-") fi # Check if port is listening if nc -z localhost "$port" 2>/dev/null; then status="${GREEN}RUNNING${NC}" # Get PID from port if not in file if [ "$pid" = "-" ]; then pid=$(lsof -t -i:$port 2>/dev/null | head -1 || echo "-") fi # Check health endpoint health_response=$(curl -s -o /dev/null -w "%{http_code}" "http://localhost:$port/health" 2>/dev/null || echo "000") if [ "$health_response" = "200" ]; then health="${GREEN}HEALTHY${NC}" else health="${YELLOW}UNKNOWN${NC}" fi else status="${RED}STOPPED${NC}" health="${RED}-${NC}" pid="-" fi echo -e "$service\t$port\t$status\t$health\t\t$pid" done } services_logs() { local service="$1" local lines="${2:-100}" if [ -z "$service" ]; then log_error "Please specify a service name" echo "" echo "Usage: $0 logs [lines]" echo "" echo "Available services:" for svc in "${MINING_SERVICES[@]}"; do echo " - $svc" done exit 1 fi local resolved resolved=$(resolve_service_name "$service") || { log_error "Unknown service: $service" exit 1 } docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" logs -f --tail="$lines" "$resolved" } # =========================================================================== # CDC / Sync Functions # =========================================================================== sync_reset() { print_section "Resetting CDC Consumer Offsets" echo -e "${YELLOW}This will reset CDC consumers to read from the beginning.${NC}" echo "Consumer Groups:" for group in "${CDC_CONSUMER_GROUPS[@]}"; do echo " - $group" done echo "" read -p "Continue? (y/n): " confirm if [ "$confirm" != "y" ]; then log_warn "Aborted" return 1 fi # Stop services that use CDC log_step "Stopping CDC consumer services" service_stop "contribution-service" service_stop "auth-service" # Wait for consumer groups to become inactive log_step "Waiting for Kafka consumers to become inactive..." log_info "Waiting 20 seconds for consumer group session timeout..." sleep 20 # Reset offsets for all consumer groups with retry logic log_step "Resetting consumer group offsets" for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting: $group" local reset_success=false local retry_count=0 local max_retries=3 while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do # Try local kafka-consumer-groups.sh first if command -v kafka-consumer-groups.sh &>/dev/null; then if kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ --execute 2>&1 | grep -q "NEW-OFFSET"; then reset_success=true fi fi # Try docker exec if local failed if [ "$reset_success" = false ] && docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then if docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ --execute 2>&1 | grep -q "NEW-OFFSET"; then reset_success=true fi fi if [ "$reset_success" = false ]; then retry_count=$((retry_count + 1)) if [ $retry_count -lt $max_retries ]; then log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." sleep 10 fi fi done if [ "$reset_success" = true ]; then log_success "Offsets reset for $group" else log_warn "Could not reset offsets for $group after $max_retries attempts" fi done log_info "Start services to begin syncing from the beginning" log_info "Run: ./deploy-mining.sh up contribution-service && ./deploy-mining.sh up auth-service" } sync_status() { print_section "CDC Sync Status" for group in "${CDC_CONSUMER_GROUPS[@]}"; do echo -e "${BOLD}Consumer Group:${NC} $group" echo "" # Try local first, then docker if command -v kafka-consumer-groups.sh &>/dev/null; then kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \ --group "$group" \ --describe 2>/dev/null && echo "" && continue fi if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ --group "$group" \ --describe 2>&1 || log_warn "Could not get status for $group" else log_warn "Kafka container '$KAFKA_CONTAINER' not found" fi echo "" done } # =========================================================================== # Debezium Outbox Connector Functions # =========================================================================== # Register all outbox connectors with Debezium Connect register_outbox_connectors() { print_section "Registering Outbox Connectors" local scripts_dir="$SCRIPT_DIR/scripts/debezium" local connect_url="$DEBEZIUM_CONNECT_URL" # Check if Debezium Connect is available if ! curl -s "$connect_url" &>/dev/null; then log_warn "Debezium Connect not available at $connect_url" log_info "Outbox connectors will not be registered. You may need to register them manually." return 1 fi for connector in "${OUTBOX_CONNECTORS[@]}"; do local config_file="$scripts_dir/${connector}.json" if [ ! -f "$config_file" ]; then log_warn "Config file not found: $config_file" continue fi log_info "Registering connector: $connector" # Check if connector already exists local existing existing=$(curl -s "$connect_url/connectors/$connector" 2>/dev/null) if echo "$existing" | grep -q '"name"'; then # Connector exists, update it log_info "Updating existing connector: $connector" # Extract just the config part for PUT request local config_only config_only=$(cat "$config_file" | sed 's/.*"config"://' | sed 's/}$//') # Use envsubst to replace environment variables, then update local result result=$(cat "$config_file" | envsubst | curl -s -X PUT \ -H "Content-Type: application/json" \ -d @- \ "$connect_url/connectors/$connector/config" 2>/dev/null) if echo "$result" | grep -q '"name"'; then log_success "Updated connector: $connector" else log_warn "Failed to update connector $connector: $result" fi else # Connector doesn't exist, create it # Replace environment variables in the config file local result result=$(cat "$config_file" | envsubst | curl -s -X POST \ -H "Content-Type: application/json" \ -d @- \ "$connect_url/connectors" 2>/dev/null) if echo "$result" | grep -q '"name"'; then log_success "Registered connector: $connector" else log_warn "Failed to register connector $connector: $result" fi fi done log_success "Outbox connector registration completed" } # Delete all outbox connectors delete_outbox_connectors() { print_section "Deleting Outbox Connectors" local connect_url="$DEBEZIUM_CONNECT_URL" if ! curl -s "$connect_url" &>/dev/null; then log_warn "Debezium Connect not available at $connect_url" return 1 fi for connector in "${OUTBOX_CONNECTORS[@]}"; do log_info "Deleting connector: $connector" curl -s -X DELETE "$connect_url/connectors/$connector" &>/dev/null log_success "Deleted connector: $connector" done } # Show outbox connector status outbox_status() { print_section "Outbox Connector Status" local connect_url="$DEBEZIUM_CONNECT_URL" if ! curl -s "$connect_url" &>/dev/null; then log_warn "Debezium Connect not available at $connect_url" return 1 fi for connector in "${OUTBOX_CONNECTORS[@]}"; do echo -e "${BOLD}Connector:${NC} $connector" local status status=$(curl -s "$connect_url/connectors/$connector/status" 2>/dev/null) if echo "$status" | grep -q '"state"'; then local state state=$(echo "$status" | grep -o '"state":"[^"]*"' | head -1 | cut -d'"' -f4) if [ "$state" = "RUNNING" ]; then echo -e " Status: ${GREEN}$state${NC}" else echo -e " Status: ${RED}$state${NC}" fi else echo -e " Status: ${RED}NOT REGISTERED${NC}" fi echo "" done } # =========================================================================== # Full Reset Function # =========================================================================== full_reset() { print_section "Full 2.0 System Reset" echo -e "${RED}${BOLD}╔════════════════════════════════════════════════════════════╗${NC}" echo -e "${RED}${BOLD}║ WARNING: This will completely reset the 2.0 system! ║${NC}" echo -e "${RED}${BOLD}║ ║${NC}" echo -e "${RED}${BOLD}║ - Stop all 2.0 services ║${NC}" echo -e "${RED}${BOLD}║ - Drop all 2.0 databases ║${NC}" echo -e "${RED}${BOLD}║ - Recreate databases ║${NC}" echo -e "${RED}${BOLD}║ - Run migrations ║${NC}" echo -e "${RED}${BOLD}║ - Reset CDC consumer offsets ║${NC}" echo -e "${RED}${BOLD}║ - Restart services (will sync from 1.0) ║${NC}" echo -e "${RED}${BOLD}║ ║${NC}" echo -e "${RED}${BOLD}║ This will NOT affect the 1.0 system in any way. ║${NC}" echo -e "${RED}${BOLD}╚════════════════════════════════════════════════════════════╝${NC}" echo "" read -p "Type 'RESET' to confirm: " confirm if [ "$confirm" != "RESET" ]; then log_warn "Aborted" return 1 fi echo "" log_step "Step 1/18: Stopping 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_stop "$service" done log_step "Step 2/18: Waiting for Kafka consumers to become inactive..." log_info "Waiting 15 seconds for consumer group session timeout..." sleep 15 log_step "Step 3/18: Resetting CDC consumer offsets..." # Reset offsets BEFORE migrations (which may start containers) for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" local reset_success=false local retry_count=0 local max_retries=3 while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do # Try docker exec with the correct container name if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then if docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ --execute 2>&1 | grep -q "NEW-OFFSET"; then log_success "CDC offsets reset for $group" reset_success=true else retry_count=$((retry_count + 1)) if [ $retry_count -lt $max_retries ]; then log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." sleep 10 fi fi else log_warn "Kafka container '$KAFKA_CONTAINER' not found" break fi done if [ "$reset_success" = false ]; then log_warn "Could not reset offsets for $group after $max_retries attempts" fi done log_step "Step 4/18: Deleting Debezium outbox connectors, offsets, and Kafka topics..." # Delete connectors BEFORE dropping databases to release replication slots local connectors=("auth-outbox-connector" "contribution-outbox-connector" "mining-outbox-connector" "trading-outbox-connector" "mining-wallet-outbox-connector") for connector in "${connectors[@]}"; do log_info "Deleting connector: $connector" curl -s -X DELETE "http://localhost:8084/connectors/$connector" 2>/dev/null || true done log_info "Waiting 5 seconds for connectors to be fully removed..." sleep 5 # Delete connector offsets from Kafka Connect internal storage # This is CRITICAL: without this, Debezium will skip initial snapshot on re-registration # because it thinks the snapshot was already completed (offset exists in connect-offsets topic) # # Reference: https://debezium.io/documentation/faq/#how_to_remove_committed_offsets_for_a_connector # Strategy 1: Kafka Connect 3.6+ REST API: DELETE /connectors//offsets (after connector deleted) # Strategy 2: Kafka Connect 3.5-: Send tombstone messages to connect-offsets topic via kafkacat log_info "Deleting connector offsets from Kafka Connect internal storage..." # Strategy 1: Try REST API method (Kafka Connect 3.6+) # Note: Connector must be deleted first (which we did above) local rest_api_worked=false for connector in "${connectors[@]}"; do log_info "Attempting to delete offset via REST API: $connector" local delete_result delete_result=$(curl -s -w "\n%{http_code}" -X DELETE "http://localhost:8084/connectors/$connector/offsets" 2>/dev/null) local http_code=$(echo "$delete_result" | tail -1) if [ "$http_code" = "200" ] || [ "$http_code" = "204" ]; then log_success "Deleted offset via REST API: $connector" rest_api_worked=true else log_warn "REST API offset deletion returned HTTP $http_code for $connector" fi done # Strategy 2: Always try tombstone method as primary approach # The offset topic name is configured via OFFSET_STORAGE_TOPIC env var in Debezium Connect # Default is "debezium_offsets" (not "connect-offsets"!) local offset_topic="debezium_offsets" log_info "Sending tombstones to offset topic: $offset_topic" # The offset key format is: ["connector-name",{"server":"topic.prefix"}] for connector in "${connectors[@]}"; do # Map connector name to topic prefix local topic_prefix="" case "$connector" in "auth-outbox-connector") topic_prefix="cdc.auth" ;; "contribution-outbox-connector") topic_prefix="cdc.contribution" ;; "mining-outbox-connector") topic_prefix="cdc.mining" ;; "trading-outbox-connector") topic_prefix="cdc.trading" ;; "mining-wallet-outbox-connector") topic_prefix="cdc.mining-wallet" ;; esac local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]" log_info "Sending tombstone for: $connector (key: $offset_key)" # Send tombstone (NULL value) using kafka-console-producer # Use null.marker to mark __NULL__ as NULL value # Format: key\t__NULL__ with parse.key=true and null.marker=__NULL__ local tombstone_sent=false # Use kafka-console-producer which is available in Kafka container # --property parse.key=true: Enable key parsing # --property key.separator=: Use literal tab as key-value separator # --property null.marker=__NULL__: Treat __NULL__ as null value (tombstone) # Note: Must use printf to properly pass tab character through SSH/docker if printf '%s\t%s\n' "$offset_key" "__NULL__" | docker exec -i "$KAFKA_CONTAINER" kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic "$offset_topic" \ --property parse.key=true \ --property "key.separator= " \ --property "null.marker=__NULL__" 2>/dev/null; then log_success "Sent tombstone via kafka-console-producer for: $connector" tombstone_sent=true fi if [ "$tombstone_sent" = false ]; then log_warn "Could not send tombstone for $connector" fi done # Wait for offset changes to be processed log_info "Waiting 5 seconds for offset deletions to be processed..." sleep 5 # Delete Kafka outbox topics to clear old messages # This is critical: old messages in Kafka will corrupt the sync if not cleared log_info "Deleting Kafka outbox topics to clear old messages..." local outbox_topics=("cdc.auth.outbox" "cdc.contribution.outbox" "cdc.mining.outbox" "cdc.trading.outbox" "cdc.mining-wallet.outbox") for topic in "${outbox_topics[@]}"; do if docker exec "$KAFKA_CONTAINER" kafka-topics --bootstrap-server localhost:9092 --delete --topic "$topic" 2>/dev/null; then log_success "Deleted Kafka topic: $topic" else log_warn "Could not delete Kafka topic: $topic (may not exist)" fi done log_step "Step 5/18: Dropping 2.0 databases..." db_drop log_step "Step 6/18: Creating 2.0 databases..." db_create log_step "Step 7/18: Running migrations..." db_migrate # Stop any containers that were started during migration log_step "Step 8/18: Stopping containers and resetting CDC offsets again..." log_info "Migration may have started CDC consumers, stopping them now..." for service in "${MINING_SERVICES[@]}"; do docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true done log_info "Waiting 20 seconds for consumer groups to become inactive..." sleep 20 # Reset CDC offsets again after migration for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" local reset_success=false local retry_count=0 local max_retries=3 while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then if docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ --execute 2>&1 | grep -q "NEW-OFFSET"; then log_success "CDC offsets reset for $group" reset_success=true else retry_count=$((retry_count + 1)) if [ $retry_count -lt $max_retries ]; then log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." sleep 10 fi fi else log_warn "Kafka container '$KAFKA_CONTAINER' not found" break fi done if [ "$reset_success" = false ]; then log_warn "Could not reset offsets for $group after $max_retries attempts" fi done # 清空 processed_cdc_events 表(因为 migration 时可能已经消费了一些消息) # 这是事务性幂等消费的关键:重置 Kafka offset 后必须同时清空幂等记录 log_info "Truncating processed_cdc_events tables to allow re-consumption..." for db in "rwa_contribution" "rwa_auth"; do if run_psql "$db" "TRUNCATE TABLE processed_cdc_events;" 2>/dev/null; then log_success "Truncated processed_cdc_events in $db" else log_warn "Could not truncate processed_cdc_events in $db (table may not exist yet)" fi done # 清空 mining-admin-service 的幂等表(processed_events) log_info "Truncating processed_events in rwa_mining_admin..." if run_psql "rwa_mining_admin" "TRUNCATE TABLE processed_events;" 2>/dev/null; then log_success "Truncated processed_events in rwa_mining_admin" else log_warn "Could not truncate processed_events in rwa_mining_admin (table may not exist yet)" fi log_step "Step 9/18: Starting 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_start "$service" done log_step "Step 10/18: Waiting for contribution-service CDC sync to complete..." log_info "Waiting for contribution-service to complete CDC sync (user_accounts -> referral_relationships -> planting_orders)..." # 等待 contribution-service 的 CDC 顺序同步完成 # 通过 /health/cdc-sync API 检查同步状态 local max_wait=600 # 最多等待 10 分钟 local wait_count=0 local sync_completed=false local cdc_sync_url="http://localhost:3020/api/v2/health/cdc-sync" while [ "$wait_count" -lt "$max_wait" ] && [ "$sync_completed" = false ]; do # 调用 API 检查同步状态 local sync_status sync_status=$(curl -s "$cdc_sync_url" 2>/dev/null || echo '{}') if echo "$sync_status" | grep -q '"allPhasesCompleted":true'; then sync_completed=true log_success "CDC sync completed - all phases finished" else # 显示当前状态 local is_running local sequential_mode is_running=$(echo "$sync_status" | grep -o '"isRunning":[^,}]*' | cut -d':' -f2) sequential_mode=$(echo "$sync_status" | grep -o '"sequentialMode":[^,}]*' | cut -d':' -f2) if [ "$is_running" = "true" ] && [ "$sequential_mode" = "true" ]; then log_info "CDC sync in progress (sequential mode)... (waited ${wait_count}s)" elif [ "$is_running" = "true" ]; then log_info "CDC consumer running... (waited ${wait_count}s)" else log_info "Waiting for CDC consumer to start... (waited ${wait_count}s)" fi sleep 5 wait_count=$((wait_count + 5)) fi done if [ "$sync_completed" = false ]; then log_warn "CDC sync did not complete within ${max_wait}s, proceeding anyway..." log_info "You may need to wait longer or check: curl $cdc_sync_url" fi log_step "Step 11/18: Registering Debezium outbox connectors..." # Register outbox connectors AFTER services are running and have synced data # This ensures outbox_events tables exist and contain data to be captured register_outbox_connectors || log_warn "Some connectors may not be registered" log_step "Step 12/18: Publishing legacy users to mining-admin-service..." # 调用 auth-service API 发布所有旧用户事件到 outbox # 这样 mining-admin-service 才能通过 Debezium 收到用户数据 local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all" local publish_result publish_result=$(curl -s -X POST "$publish_url" 2>/dev/null || echo '{"error": "curl failed"}') if echo "$publish_result" | grep -q '"success":true'; then local published_count published_count=$(echo "$publish_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*') log_success "Published $published_count legacy user events to outbox" else log_warn "Failed to publish legacy users: $publish_result" log_info "You may need to manually call: curl -X POST $publish_url" fi log_step "Step 13/18: Waiting for connectors to start capturing..." log_info "Waiting 10 seconds for Debezium connectors to initialize..." sleep 10 log_step "Step 14/18: Publishing contribution data to mining-admin-service..." # 调用 contribution-service API 发布所有算力账户事件到 outbox local contrib_publish_url="http://localhost:3020/api/v2/admin/contribution-accounts/publish-all" local contrib_result contrib_result=$(curl -s -X POST "$contrib_publish_url" 2>/dev/null || echo '{"error": "curl failed"}') if echo "$contrib_result" | grep -q '"success":true'; then local contrib_count contrib_count=$(echo "$contrib_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*') log_success "Published $contrib_count contribution account events to outbox" else log_warn "Failed to publish contribution data: $contrib_result" log_info "You may need to manually call: curl -X POST $contrib_publish_url" fi log_step "Step 15/18: Publishing referral relationships to mining-admin-service..." # 调用 contribution-service API 发布所有推荐关系事件到 outbox local referral_publish_url="http://localhost:3020/api/v2/admin/referrals/publish-all" local referral_result referral_result=$(curl -s -X POST "$referral_publish_url" 2>/dev/null || echo '{"error": "curl failed"}') if echo "$referral_result" | grep -q '"success":true'; then local referral_count referral_count=$(echo "$referral_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*') log_success "Published $referral_count referral events to outbox" else log_warn "Failed to publish referral data: $referral_result" log_info "You may need to manually call: curl -X POST $referral_publish_url" fi log_step "Step 16/18: Publishing adoption records to mining-admin-service..." # 调用 contribution-service API 发布所有认种记录事件到 outbox local adoption_publish_url="http://localhost:3020/api/v2/admin/adoptions/publish-all" local adoption_result adoption_result=$(curl -s -X POST "$adoption_publish_url" 2>/dev/null || echo '{"error": "curl failed"}') if echo "$adoption_result" | grep -q '"success":true'; then local adoption_count adoption_count=$(echo "$adoption_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*') log_success "Published $adoption_count adoption events to outbox" else log_warn "Failed to publish adoption data: $adoption_result" log_info "You may need to manually call: curl -X POST $adoption_publish_url" fi # NOTE: contribution-records/publish-all is NOT called here because: # - Contribution records are already published to outbox when calculated by contribution-service # - Debezium automatically captures outbox_events and sends to Kafka # - Calling publish-all again would cause duplicate records in mining-admin-service # - See: contribution-calculation.service.ts -> publishContributionRecordEvents() log_step "Step 17/18: Publishing network progress to mining-admin-service..." # 调用 contribution-service API 发布全网进度事件到 outbox local progress_publish_url="http://localhost:3020/api/v2/admin/network-progress/publish" local progress_result progress_result=$(curl -s -X POST "$progress_publish_url" 2>/dev/null || echo '{"error": "curl failed"}') if echo "$progress_result" | grep -q '"success":true'; then log_success "Published network progress to outbox" else log_warn "Failed to publish network progress: $progress_result" log_info "You may need to manually call: curl -X POST $progress_publish_url" fi log_step "Step 18/18: Waiting for mining-admin-service to sync all data..." # 等待 mining-admin-service 消费 outbox 事件 log_info "Waiting 15 seconds for mining-admin-service to sync all data..." sleep 15 echo "" echo -e "${GREEN}${BOLD}╔════════════════════════════════════════════════════════════╗${NC}" echo -e "${GREEN}${BOLD}║ Full reset completed successfully! ║${NC}" echo -e "${GREEN}${BOLD}║ ║${NC}" echo -e "${GREEN}${BOLD}║ The 2.0 system has synced all data from 1.0 via CDC. ║${NC}" echo -e "${GREEN}${BOLD}║ Monitor with: ./deploy-mining.sh logs contribution-service║${NC}" echo -e "${GREEN}${BOLD}╚════════════════════════════════════════════════════════════╝${NC}" } # =========================================================================== # Health Check Function # =========================================================================== health_check() { print_section "2.0 System Health Check" local all_healthy=true # Check databases echo -e "${BOLD}Databases:${NC}" for db in "${MINING_DATABASES[@]}"; do if run_psql "$db" "SELECT 1" &>/dev/null; then echo -e " ${GREEN}✓${NC} $db" else echo -e " ${RED}✗${NC} $db" all_healthy=false fi done echo "" echo -e "${BOLD}Services:${NC}" for service in "${MINING_SERVICES[@]}"; do port="${SERVICE_PORTS[$service]}" health_response=$(curl -s -o /dev/null -w "%{http_code}" "http://localhost:$port/health" 2>/dev/null || echo "000") if [ "$health_response" = "200" ]; then echo -e " ${GREEN}✓${NC} $service (port $port)" else echo -e " ${RED}✗${NC} $service (port $port)" all_healthy=false fi done echo "" echo -e "${BOLD}Infrastructure:${NC}" # Kafka if nc -z ${KAFKA_BROKERS%%:*} ${KAFKA_BROKERS##*:} 2>/dev/null; then echo -e " ${GREEN}✓${NC} Kafka ($KAFKA_BROKERS)" else echo -e " ${RED}✗${NC} Kafka ($KAFKA_BROKERS)" all_healthy=false fi # Redis if nc -z "$REDIS_HOST" "$REDIS_PORT" 2>/dev/null; then echo -e " ${GREEN}✓${NC} Redis ($REDIS_HOST:$REDIS_PORT)" else echo -e " ${RED}✗${NC} Redis ($REDIS_HOST:$REDIS_PORT)" all_healthy=false fi echo "" if [ "$all_healthy" = true ]; then echo -e "${GREEN}${BOLD}All systems healthy!${NC}" else echo -e "${RED}${BOLD}Some systems are unhealthy!${NC}" return 1 fi } # =========================================================================== # Statistics Function # =========================================================================== show_stats() { print_section "2.0 System Statistics" local stats_sql=" SELECT tablename AS table, pg_size_pretty(pg_total_relation_size(schemaname || '.' || tablename)) AS size FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname || '.' || tablename) DESC LIMIT 10; " for db in "${MINING_DATABASES[@]}"; do echo -e "${BOLD}Database: $db${NC}" if run_psql "$db" "SELECT 1" &>/dev/null; then # Get table stats using Docker or local psql if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${POSTGRES_CONTAINER}$"; then docker exec -e PGPASSWORD="$POSTGRES_PASSWORD" "$POSTGRES_CONTAINER" \ psql -h localhost -U "$POSTGRES_USER" -d "$db" -t -c "$stats_sql" 2>/dev/null || echo " Could not get table stats" else PGPASSWORD="$POSTGRES_PASSWORD" psql -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" -U "$POSTGRES_USER" -d "$db" -t -c "$stats_sql" 2>/dev/null || echo " Could not get table stats" fi else echo " Database not available" fi echo "" done } # =========================================================================== # Help Function # =========================================================================== show_help() { print_header echo "Usage: $0 [service] [options]" echo "" echo -e "${BOLD}Service Management:${NC}" echo " up [service] Start all or specific service" echo " down [service] Stop all or specific service" echo " restart [service] Restart all or specific service" echo " status Show all service status" echo " logs [lines] View logs for specific service" echo " build [service] [--no-cache] Build all or specific service" echo " rebuild [service] Rebuild with --no-cache" echo "" echo -e "${BOLD}Database Management:${NC}" echo " db-create [service] Create databases (all or for specific service)" echo " db-migrate [service] Run Prisma migrations" echo " db-reset [service] Drop and recreate databases ${RED}(DANGEROUS!)${NC}" echo " db-status Show database status" echo "" echo -e "${BOLD}CDC / Sync Management:${NC}" echo " sync-reset Reset CDC consumer to read from beginning" echo " sync-status Show CDC consumer group status" echo " outbox-register Register all Debezium outbox connectors" echo " outbox-status Show outbox connector status" echo " outbox-delete Delete all outbox connectors" echo "" echo -e "${BOLD}Full Reset:${NC}" echo " full-reset Complete system reset ${RED}(DANGEROUS!)${NC}" echo " Drops DBs, resets CDC, restarts services" echo "" echo -e "${BOLD}Health & Monitoring:${NC}" echo " health Check health of all 2.0 components" echo " stats Show system statistics" echo "" echo -e "${BOLD}2.0 Services:${NC}" for service in "${MINING_SERVICES[@]}"; do echo " - $service (port ${SERVICE_PORTS[$service]})" done echo "" echo -e "${BOLD}Service Aliases:${NC}" echo " contrib, contribution -> contribution-service" echo " mining -> mining-service" echo " trading -> trading-service" echo " admin -> mining-admin-service" echo " auth -> auth-service" echo " wallet -> mining-wallet-service" echo "" echo -e "${BOLD}Examples:${NC}" echo " $0 up # Start all services" echo " $0 up mining # Start only mining-service" echo " $0 restart contrib # Restart contribution-service" echo " $0 build trading --no-cache # Rebuild trading-service" echo " $0 logs admin 200 # Show last 200 lines of admin logs" echo " $0 db-reset mining # Reset only mining-service database" echo "" echo -e "${YELLOW}Note: The 2.0 system is completely isolated from 1.0.${NC}" echo -e "${YELLOW}Any reset operation will NOT affect the 1.0 system.${NC}" } # =========================================================================== # Main # =========================================================================== main() { load_env case "${1:-}" in # Service commands up) services_up "$2" ;; down) services_down "$2" ;; restart) services_restart "$2" ;; status) print_header services_status ;; logs) services_logs "$2" "$3" ;; build) services_build "$2" "$3" ;; rebuild) services_rebuild "$2" "$3" ;; # Database commands db-create) if [ -n "$2" ]; then resolved=$(resolve_service_name "$2") && db_create "$resolved" else db_create fi ;; db-migrate) if [ -n "$2" ]; then resolved=$(resolve_service_name "$2") && db_migrate "$resolved" else db_migrate fi ;; db-reset) if [ -n "$2" ]; then resolved=$(resolve_service_name "$2") && db_reset "$resolved" else db_reset fi ;; db-status) print_header db_status ;; # Sync commands sync-reset) sync_reset ;; sync-status) sync_status ;; # Outbox connector commands outbox-register) print_header register_outbox_connectors ;; outbox-status) print_header outbox_status ;; outbox-delete) print_header delete_outbox_connectors ;; # Full reset full-reset) print_header full_reset ;; # Health & monitoring health) print_header health_check ;; stats) print_header show_stats ;; # Help help|--help|-h|"") show_help ;; *) log_error "Unknown command: $1" echo "" show_help exit 1 ;; esac } main "$@"