/** * Kafka cluster configuration. * Supports multi-broker clusters via comma-separated KAFKA_BROKERS env. * Producer uses idempotent mode for exactly-once semantics. */ export interface KafkaConfig { brokers: string[]; clientId: string; groupId?: string; /** Enable SSL for production clusters */ ssl?: boolean; /** SASL authentication for production */ sasl?: { mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512'; username: string; password: string; }; /** Producer idempotency (default: true) */ idempotent?: boolean; /** Consumer session timeout ms (default: 30000) */ sessionTimeout?: number; /** Consumer heartbeat interval ms (default: 3000) */ heartbeatInterval?: number; /** Max retry time ms (default: 30000) */ maxRetryTime?: number; /** Number of retries (default: 5) */ retries?: number; } export function createKafkaConfig( clientId: string, groupId?: string, ): KafkaConfig { const brokers = (process.env.KAFKA_BROKERS || 'localhost:9092') .split(',') .map((b) => b.trim()); const config: KafkaConfig = { brokers, clientId, groupId, idempotent: true, sessionTimeout: 30000, heartbeatInterval: 3000, maxRetryTime: 30000, retries: 5, }; // Production SSL/SASL if (process.env.KAFKA_SSL === 'true') { config.ssl = true; } if (process.env.KAFKA_SASL_USERNAME) { config.sasl = { mechanism: (process.env.KAFKA_SASL_MECHANISM as any) || 'scram-sha-512', username: process.env.KAFKA_SASL_USERNAME, password: process.env.KAFKA_SASL_PASSWORD || '', }; } return config; }