package kafka import ( "encoding/json" "fmt" "strings" "github.com/IBM/sarama" "github.com/genex/trading-service/internal/domain/event" ) // KafkaEventPublisher implements event.EventPublisher by publishing domain events to Kafka // using the IBM/sarama client. type KafkaEventPublisher struct { producer sarama.SyncProducer } // NewKafkaEventPublisher creates a new Kafka event publisher connected to the given brokers. func NewKafkaEventPublisher(brokers []string) (*KafkaEventPublisher, error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 3 config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { return nil, fmt.Errorf("failed to create Kafka producer: %w", err) } return &KafkaEventPublisher{producer: producer}, nil } // Publish serializes a domain event to JSON and publishes it to the appropriate Kafka topic. // Topic is derived from the event name: "order.placed" → "trading.orders", // "trade.executed" → "trading.trades". func (p *KafkaEventPublisher) Publish(evt event.DomainEvent) error { payload, err := json.Marshal(evt) if err != nil { return fmt.Errorf("failed to marshal event %s: %w", evt.EventName(), err) } topic := resolveTopic(evt.EventName()) msg := &sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder(evt.EventName()), Value: sarama.ByteEncoder(payload), } _, _, err = p.producer.SendMessage(msg) if err != nil { return fmt.Errorf("failed to publish event %s to topic %s: %w", evt.EventName(), topic, err) } return nil } // Close shuts down the Kafka producer gracefully. func (p *KafkaEventPublisher) Close() error { if p.producer != nil { return p.producer.Close() } return nil } // resolveTopic maps event names to Kafka topics. func resolveTopic(eventName string) string { if strings.HasPrefix(eventName, "trade.") { return "trading.trades" } return "trading.orders" }