package service import ( "context" "fmt" "sync" "time" "go.uber.org/zap" "github.com/genex/chain-indexer/internal/domain/entity" "github.com/genex/chain-indexer/internal/domain/event" "github.com/genex/chain-indexer/internal/domain/repository" ) const ( // pollInterval is the time between indexing cycles. pollInterval = 2 * time.Second // maxBatchSize is the maximum number of blocks indexed per cycle. maxBatchSize = 50 ) // IndexerService orchestrates block indexing from the blockchain node. type IndexerService struct { logger *zap.Logger blockRepo repository.BlockRepository txRepo repository.TransactionRepository publisher event.EventPublisher chainClient repository.ChainClient mu sync.RWMutex isRunning bool stopCh chan struct{} } // NewIndexerService creates a new IndexerService with all dependencies injected. func NewIndexerService( logger *zap.Logger, blockRepo repository.BlockRepository, txRepo repository.TransactionRepository, publisher event.EventPublisher, chainClient repository.ChainClient, ) *IndexerService { return &IndexerService{ logger: logger, blockRepo: blockRepo, txRepo: txRepo, publisher: publisher, chainClient: chainClient, stopCh: make(chan struct{}), } } // Start begins the block indexing loop. func (s *IndexerService) Start() { s.mu.Lock() s.isRunning = true s.mu.Unlock() s.logger.Info("Chain indexer started") go func() { // Index immediately on start, then poll. s.indexNewBlocks() ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { select { case <-s.stopCh: return case <-ticker.C: s.indexNewBlocks() } } }() } // Stop halts the indexing loop. func (s *IndexerService) Stop() { s.mu.Lock() defer s.mu.Unlock() if s.isRunning { s.isRunning = false close(s.stopCh) s.logger.Info("Chain indexer stopped") } } // GetLastHeight returns the height of the most recently indexed block. func (s *IndexerService) GetLastHeight() int64 { ctx := context.Background() latest, err := s.blockRepo.FindLatest(ctx) if err != nil || latest == nil { return 0 } return latest.Height } // GetChainHeight returns the current head block number from the chain node. func (s *IndexerService) GetChainHeight() int64 { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() height, err := s.chainClient.GetLatestBlockNumber(ctx) if err != nil { s.logger.Warn("Failed to get chain height", zap.Error(err)) return 0 } return height } // IsSynced returns true if the indexer has caught up with the chain head. func (s *IndexerService) IsSynced() bool { return s.GetLastHeight() >= s.GetChainHeight() } // GetRecentBlocks returns the N most recently indexed blocks. func (s *IndexerService) GetRecentBlocks(limit int) []entity.Block { ctx := context.Background() blocks, err := s.blockRepo.FindRecent(ctx, limit) if err != nil { s.logger.Error("Failed to get recent blocks", zap.Error(err)) return nil } result := make([]entity.Block, len(blocks)) for i, b := range blocks { result[i] = *b } return result } // indexNewBlocks fetches and indexes all blocks from lastHeight+1 up to the chain head. func (s *IndexerService) indexNewBlocks() { ctx := context.Background() lastHeight := s.GetLastHeight() chainHeight, err := s.chainClient.GetLatestBlockNumber(ctx) if err != nil { s.logger.Error("Failed to get latest block number from chain", zap.Error(err)) return } if chainHeight <= lastHeight { return // already up to date } // Cap batch size to avoid long-running cycles. toHeight := chainHeight if toHeight > lastHeight+maxBatchSize { toHeight = lastHeight + maxBatchSize } indexed := int64(0) for height := lastHeight + 1; height <= toHeight; height++ { if err := s.indexBlock(ctx, height); err != nil { s.logger.Error("Failed to index block", zap.Int64("height", height), zap.Error(err), ) break // retry in the next cycle } indexed++ } if indexed > 0 { s.logger.Info("Indexed blocks", zap.Int64("from", lastHeight+1), zap.Int64("to", lastHeight+indexed), zap.Int64("chainHead", chainHeight), zap.Int64("lag", chainHeight-lastHeight-indexed), ) } } // indexBlock fetches a single block from the chain, persists it, and publishes events. func (s *IndexerService) indexBlock(ctx context.Context, height int64) error { block, txs, err := s.chainClient.GetBlockByNumber(ctx, height) if err != nil { return fmt.Errorf("fetch block %d: %w", height, err) } // Persist block. if err := s.blockRepo.SaveBlock(ctx, block); err != nil { return fmt.Errorf("save block %d: %w", height, err) } // Persist transactions in batch. if len(txs) > 0 { if err := s.txRepo.SaveBatch(ctx, txs); err != nil { return fmt.Errorf("save txs for block %d: %w", height, err) } } // Publish block event. blockEvt := event.NewBlockIndexedEvent(block.Height, block.Hash, block.TxCount, block.Timestamp) if err := s.publisher.Publish(blockEvt); err != nil { s.logger.Warn("Failed to publish block event", zap.Int64("height", height), zap.Error(err)) } // Publish transaction events. for _, tx := range txs { txEvt := event.NewTransactionIndexedEvent(tx.Hash, tx.BlockHeight, tx.From, tx.To, tx.Amount, tx.Status) if err := s.publisher.Publish(txEvt); err != nil { s.logger.Warn("Failed to publish tx event", zap.String("txHash", tx.Hash), zap.Error(err)) } } return nil }