gcx/backend/services/chain-indexer/internal/infrastructure/kafka/event_publisher.go

74 lines
2.0 KiB
Go

package kafka
import (
"encoding/json"
"fmt"
"strings"
"github.com/IBM/sarama"
"github.com/genex/chain-indexer/internal/domain/event"
)
// Compile-time check: KafkaEventPublisher implements event.EventPublisher.
var _ event.EventPublisher = (*KafkaEventPublisher)(nil)
// KafkaEventPublisher implements event.EventPublisher by publishing domain events
// to Kafka using the IBM/sarama client.
type KafkaEventPublisher struct {
producer sarama.SyncProducer
}
// NewKafkaEventPublisher creates a new Kafka event publisher connected to the given brokers.
func NewKafkaEventPublisher(brokers []string) (*KafkaEventPublisher, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 3
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka producer: %w", err)
}
return &KafkaEventPublisher{producer: producer}, nil
}
// Publish serializes a domain event to JSON and publishes it to the appropriate Kafka topic.
func (p *KafkaEventPublisher) Publish(evt event.DomainEvent) error {
payload, err := json.Marshal(evt)
if err != nil {
return fmt.Errorf("failed to marshal event %s: %w", evt.EventName(), err)
}
topic := resolveTopic(evt.EventName())
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(evt.EventName()),
Value: sarama.ByteEncoder(payload),
}
_, _, err = p.producer.SendMessage(msg)
if err != nil {
return fmt.Errorf("failed to publish event %s to topic %s: %w", evt.EventName(), topic, err)
}
return nil
}
// Close shuts down the Kafka producer gracefully.
func (p *KafkaEventPublisher) Close() error {
if p.producer != nil {
return p.producer.Close()
}
return nil
}
// resolveTopic maps event names to Kafka topics.
func resolveTopic(eventName string) string {
if strings.HasPrefix(eventName, "chain.block.") {
return "chain.blocks"
}
return "chain.transactions"
}