gcx/backend/services/chain-indexer/internal/application/service/indexer_service.go

145 lines
3.5 KiB
Go

package service
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"github.com/genex/chain-indexer/internal/domain/entity"
"github.com/genex/chain-indexer/internal/domain/event"
"github.com/genex/chain-indexer/internal/domain/repository"
)
// IndexerService is the application service that orchestrates block indexing.
// It depends on domain repository and event publisher interfaces — not concrete
// implementations — following the Dependency Inversion Principle.
type IndexerService struct {
logger *zap.Logger
blockRepo repository.BlockRepository
txRepo repository.TransactionRepository
publisher event.EventPublisher
mu sync.RWMutex
isRunning bool
stopCh chan struct{}
}
// NewIndexerService creates a new IndexerService with all dependencies injected.
func NewIndexerService(
logger *zap.Logger,
blockRepo repository.BlockRepository,
txRepo repository.TransactionRepository,
publisher event.EventPublisher,
) *IndexerService {
return &IndexerService{
logger: logger,
blockRepo: blockRepo,
txRepo: txRepo,
publisher: publisher,
stopCh: make(chan struct{}),
}
}
// Start begins the mock block indexing loop.
func (s *IndexerService) Start() {
s.mu.Lock()
s.isRunning = true
s.mu.Unlock()
s.logger.Info("Chain indexer started (mock mode)")
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
if err := s.indexNextBlock(); err != nil {
s.logger.Error("Failed to index block", zap.Error(err))
}
}
}
}()
}
// Stop halts the indexing loop.
func (s *IndexerService) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.isRunning {
s.isRunning = false
close(s.stopCh)
s.logger.Info("Chain indexer stopped")
}
}
// GetLastHeight returns the height of the most recently indexed block.
func (s *IndexerService) GetLastHeight() int64 {
ctx := context.Background()
latest, err := s.blockRepo.FindLatest(ctx)
if err != nil || latest == nil {
return 0
}
return latest.Height
}
// GetRecentBlocks returns the N most recently indexed blocks.
func (s *IndexerService) GetRecentBlocks(limit int) []entity.Block {
ctx := context.Background()
blocks, err := s.blockRepo.FindRecent(ctx, limit)
if err != nil {
s.logger.Error("Failed to get recent blocks", zap.Error(err))
return nil
}
// Convert []*entity.Block to []entity.Block for backward compatibility
result := make([]entity.Block, len(blocks))
for i, b := range blocks {
result[i] = *b
}
return result
}
// indexNextBlock creates and indexes a mock block, persists it through the
// repository, and publishes a domain event.
func (s *IndexerService) indexNextBlock() error {
ctx := context.Background()
// Determine next height
lastHeight := s.GetLastHeight()
nextHeight := lastHeight + 1
// Create block via domain factory
block, err := entity.NewBlock(
nextHeight,
fmt.Sprintf("0x%064d", nextHeight),
time.Now(),
0,
)
if err != nil {
return fmt.Errorf("failed to create block entity: %w", err)
}
// Persist through repository
if err := s.blockRepo.SaveBlock(ctx, block); err != nil {
return fmt.Errorf("failed to save block: %w", err)
}
// Publish domain event
evt := event.NewBlockIndexedEvent(block.Height, block.Hash, block.TxCount, block.Timestamp)
if err := s.publisher.Publish(evt); err != nil {
s.logger.Warn("Failed to publish block indexed event", zap.Error(err))
// Non-fatal: don't fail the indexing operation
}
s.logger.Debug("Indexed mock block", zap.Int64("height", nextHeight))
return nil
}