gcx/backend/services/chain-indexer/internal/application/service/indexer_service.go

215 lines
5.4 KiB
Go

package service
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"github.com/genex/chain-indexer/internal/domain/entity"
"github.com/genex/chain-indexer/internal/domain/event"
"github.com/genex/chain-indexer/internal/domain/repository"
)
const (
// pollInterval is the time between indexing cycles.
pollInterval = 2 * time.Second
// maxBatchSize is the maximum number of blocks indexed per cycle.
maxBatchSize = 50
)
// IndexerService orchestrates block indexing from the blockchain node.
type IndexerService struct {
logger *zap.Logger
blockRepo repository.BlockRepository
txRepo repository.TransactionRepository
publisher event.EventPublisher
chainClient repository.ChainClient
mu sync.RWMutex
isRunning bool
stopCh chan struct{}
}
// NewIndexerService creates a new IndexerService with all dependencies injected.
func NewIndexerService(
logger *zap.Logger,
blockRepo repository.BlockRepository,
txRepo repository.TransactionRepository,
publisher event.EventPublisher,
chainClient repository.ChainClient,
) *IndexerService {
return &IndexerService{
logger: logger,
blockRepo: blockRepo,
txRepo: txRepo,
publisher: publisher,
chainClient: chainClient,
stopCh: make(chan struct{}),
}
}
// Start begins the block indexing loop.
func (s *IndexerService) Start() {
s.mu.Lock()
s.isRunning = true
s.mu.Unlock()
s.logger.Info("Chain indexer started")
go func() {
// Index immediately on start, then poll.
s.indexNewBlocks()
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
s.indexNewBlocks()
}
}
}()
}
// Stop halts the indexing loop.
func (s *IndexerService) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.isRunning {
s.isRunning = false
close(s.stopCh)
s.logger.Info("Chain indexer stopped")
}
}
// GetLastHeight returns the height of the most recently indexed block.
func (s *IndexerService) GetLastHeight() int64 {
ctx := context.Background()
latest, err := s.blockRepo.FindLatest(ctx)
if err != nil || latest == nil {
return 0
}
return latest.Height
}
// GetChainHeight returns the current head block number from the chain node.
func (s *IndexerService) GetChainHeight() int64 {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
height, err := s.chainClient.GetLatestBlockNumber(ctx)
if err != nil {
s.logger.Warn("Failed to get chain height", zap.Error(err))
return 0
}
return height
}
// IsSynced returns true if the indexer has caught up with the chain head.
func (s *IndexerService) IsSynced() bool {
return s.GetLastHeight() >= s.GetChainHeight()
}
// GetRecentBlocks returns the N most recently indexed blocks.
func (s *IndexerService) GetRecentBlocks(limit int) []entity.Block {
ctx := context.Background()
blocks, err := s.blockRepo.FindRecent(ctx, limit)
if err != nil {
s.logger.Error("Failed to get recent blocks", zap.Error(err))
return nil
}
result := make([]entity.Block, len(blocks))
for i, b := range blocks {
result[i] = *b
}
return result
}
// indexNewBlocks fetches and indexes all blocks from lastHeight+1 up to the chain head.
func (s *IndexerService) indexNewBlocks() {
ctx := context.Background()
lastHeight := s.GetLastHeight()
chainHeight, err := s.chainClient.GetLatestBlockNumber(ctx)
if err != nil {
s.logger.Error("Failed to get latest block number from chain", zap.Error(err))
return
}
if chainHeight <= lastHeight {
return // already up to date
}
// Cap batch size to avoid long-running cycles.
toHeight := chainHeight
if toHeight > lastHeight+maxBatchSize {
toHeight = lastHeight + maxBatchSize
}
indexed := int64(0)
for height := lastHeight + 1; height <= toHeight; height++ {
if err := s.indexBlock(ctx, height); err != nil {
s.logger.Error("Failed to index block",
zap.Int64("height", height),
zap.Error(err),
)
break // retry in the next cycle
}
indexed++
}
if indexed > 0 {
s.logger.Info("Indexed blocks",
zap.Int64("from", lastHeight+1),
zap.Int64("to", lastHeight+indexed),
zap.Int64("chainHead", chainHeight),
zap.Int64("lag", chainHeight-lastHeight-indexed),
)
}
}
// indexBlock fetches a single block from the chain, persists it, and publishes events.
func (s *IndexerService) indexBlock(ctx context.Context, height int64) error {
block, txs, err := s.chainClient.GetBlockByNumber(ctx, height)
if err != nil {
return fmt.Errorf("fetch block %d: %w", height, err)
}
// Persist block.
if err := s.blockRepo.SaveBlock(ctx, block); err != nil {
return fmt.Errorf("save block %d: %w", height, err)
}
// Persist transactions in batch.
if len(txs) > 0 {
if err := s.txRepo.SaveBatch(ctx, txs); err != nil {
return fmt.Errorf("save txs for block %d: %w", height, err)
}
}
// Publish block event.
blockEvt := event.NewBlockIndexedEvent(block.Height, block.Hash, block.TxCount, block.Timestamp)
if err := s.publisher.Publish(blockEvt); err != nil {
s.logger.Warn("Failed to publish block event", zap.Int64("height", height), zap.Error(err))
}
// Publish transaction events.
for _, tx := range txs {
txEvt := event.NewTransactionIndexedEvent(tx.Hash, tx.BlockHeight, tx.From, tx.To, tx.Amount, tx.Status)
if err := s.publisher.Publish(txEvt); err != nil {
s.logger.Warn("Failed to publish tx event", zap.String("txHash", tx.Hash), zap.Error(err))
}
}
return nil
}