215 lines
5.4 KiB
Go
215 lines
5.4 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/genex/chain-indexer/internal/domain/entity"
|
|
"github.com/genex/chain-indexer/internal/domain/event"
|
|
"github.com/genex/chain-indexer/internal/domain/repository"
|
|
)
|
|
|
|
const (
|
|
// pollInterval is the time between indexing cycles.
|
|
pollInterval = 2 * time.Second
|
|
// maxBatchSize is the maximum number of blocks indexed per cycle.
|
|
maxBatchSize = 50
|
|
)
|
|
|
|
// IndexerService orchestrates block indexing from the blockchain node.
|
|
type IndexerService struct {
|
|
logger *zap.Logger
|
|
blockRepo repository.BlockRepository
|
|
txRepo repository.TransactionRepository
|
|
publisher event.EventPublisher
|
|
chainClient repository.ChainClient
|
|
|
|
mu sync.RWMutex
|
|
isRunning bool
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
// NewIndexerService creates a new IndexerService with all dependencies injected.
|
|
func NewIndexerService(
|
|
logger *zap.Logger,
|
|
blockRepo repository.BlockRepository,
|
|
txRepo repository.TransactionRepository,
|
|
publisher event.EventPublisher,
|
|
chainClient repository.ChainClient,
|
|
) *IndexerService {
|
|
return &IndexerService{
|
|
logger: logger,
|
|
blockRepo: blockRepo,
|
|
txRepo: txRepo,
|
|
publisher: publisher,
|
|
chainClient: chainClient,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start begins the block indexing loop.
|
|
func (s *IndexerService) Start() {
|
|
s.mu.Lock()
|
|
s.isRunning = true
|
|
s.mu.Unlock()
|
|
|
|
s.logger.Info("Chain indexer started")
|
|
|
|
go func() {
|
|
// Index immediately on start, then poll.
|
|
s.indexNewBlocks()
|
|
|
|
ticker := time.NewTicker(pollInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
s.indexNewBlocks()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop halts the indexing loop.
|
|
func (s *IndexerService) Stop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.isRunning {
|
|
s.isRunning = false
|
|
close(s.stopCh)
|
|
s.logger.Info("Chain indexer stopped")
|
|
}
|
|
}
|
|
|
|
// GetLastHeight returns the height of the most recently indexed block.
|
|
func (s *IndexerService) GetLastHeight() int64 {
|
|
ctx := context.Background()
|
|
latest, err := s.blockRepo.FindLatest(ctx)
|
|
if err != nil || latest == nil {
|
|
return 0
|
|
}
|
|
return latest.Height
|
|
}
|
|
|
|
// GetChainHeight returns the current head block number from the chain node.
|
|
func (s *IndexerService) GetChainHeight() int64 {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
height, err := s.chainClient.GetLatestBlockNumber(ctx)
|
|
if err != nil {
|
|
s.logger.Warn("Failed to get chain height", zap.Error(err))
|
|
return 0
|
|
}
|
|
return height
|
|
}
|
|
|
|
// IsSynced returns true if the indexer has caught up with the chain head.
|
|
func (s *IndexerService) IsSynced() bool {
|
|
return s.GetLastHeight() >= s.GetChainHeight()
|
|
}
|
|
|
|
// GetRecentBlocks returns the N most recently indexed blocks.
|
|
func (s *IndexerService) GetRecentBlocks(limit int) []entity.Block {
|
|
ctx := context.Background()
|
|
blocks, err := s.blockRepo.FindRecent(ctx, limit)
|
|
if err != nil {
|
|
s.logger.Error("Failed to get recent blocks", zap.Error(err))
|
|
return nil
|
|
}
|
|
|
|
result := make([]entity.Block, len(blocks))
|
|
for i, b := range blocks {
|
|
result[i] = *b
|
|
}
|
|
return result
|
|
}
|
|
|
|
// indexNewBlocks fetches and indexes all blocks from lastHeight+1 up to the chain head.
|
|
func (s *IndexerService) indexNewBlocks() {
|
|
ctx := context.Background()
|
|
|
|
lastHeight := s.GetLastHeight()
|
|
|
|
chainHeight, err := s.chainClient.GetLatestBlockNumber(ctx)
|
|
if err != nil {
|
|
s.logger.Error("Failed to get latest block number from chain", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if chainHeight <= lastHeight {
|
|
return // already up to date
|
|
}
|
|
|
|
// Cap batch size to avoid long-running cycles.
|
|
toHeight := chainHeight
|
|
if toHeight > lastHeight+maxBatchSize {
|
|
toHeight = lastHeight + maxBatchSize
|
|
}
|
|
|
|
indexed := int64(0)
|
|
for height := lastHeight + 1; height <= toHeight; height++ {
|
|
if err := s.indexBlock(ctx, height); err != nil {
|
|
s.logger.Error("Failed to index block",
|
|
zap.Int64("height", height),
|
|
zap.Error(err),
|
|
)
|
|
break // retry in the next cycle
|
|
}
|
|
indexed++
|
|
}
|
|
|
|
if indexed > 0 {
|
|
s.logger.Info("Indexed blocks",
|
|
zap.Int64("from", lastHeight+1),
|
|
zap.Int64("to", lastHeight+indexed),
|
|
zap.Int64("chainHead", chainHeight),
|
|
zap.Int64("lag", chainHeight-lastHeight-indexed),
|
|
)
|
|
}
|
|
}
|
|
|
|
// indexBlock fetches a single block from the chain, persists it, and publishes events.
|
|
func (s *IndexerService) indexBlock(ctx context.Context, height int64) error {
|
|
block, txs, err := s.chainClient.GetBlockByNumber(ctx, height)
|
|
if err != nil {
|
|
return fmt.Errorf("fetch block %d: %w", height, err)
|
|
}
|
|
|
|
// Persist block.
|
|
if err := s.blockRepo.SaveBlock(ctx, block); err != nil {
|
|
return fmt.Errorf("save block %d: %w", height, err)
|
|
}
|
|
|
|
// Persist transactions in batch.
|
|
if len(txs) > 0 {
|
|
if err := s.txRepo.SaveBatch(ctx, txs); err != nil {
|
|
return fmt.Errorf("save txs for block %d: %w", height, err)
|
|
}
|
|
}
|
|
|
|
// Publish block event.
|
|
blockEvt := event.NewBlockIndexedEvent(block.Height, block.Hash, block.TxCount, block.Timestamp)
|
|
if err := s.publisher.Publish(blockEvt); err != nil {
|
|
s.logger.Warn("Failed to publish block event", zap.Int64("height", height), zap.Error(err))
|
|
}
|
|
|
|
// Publish transaction events.
|
|
for _, tx := range txs {
|
|
txEvt := event.NewTransactionIndexedEvent(tx.Hash, tx.BlockHeight, tx.From, tx.To, tx.Amount, tx.Status)
|
|
if err := s.publisher.Publish(txEvt); err != nil {
|
|
s.logger.Warn("Failed to publish tx event", zap.String("txHash", tx.Hash), zap.Error(err))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|