Skip to content

Commit

Permalink
Re-implement indexer with multi-endpoint support and reliable fork tr…
Browse files Browse the repository at this point in the history
…acking (#8)

rewrite indexer in a way that allows keeping track of forks across multiple clients
  • Loading branch information
pk910 authored Aug 23, 2023
1 parent 6dafd30 commit a2e66c8
Show file tree
Hide file tree
Showing 39 changed files with 2,825 additions and 1,293 deletions.
6 changes: 0 additions & 6 deletions config/default.config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,9 @@ beaconapi:

# indexer keeps track of the latest epochs in memory.
indexer:
# number of epochs to load on startup
prepopulateEpochs: 2

# max number of epochs to keep in memory
inMemoryEpochs: 3

# epoch processing delay (should be >= 2)
epochProcessingDelay: 2

# disable synchronizing and everything that writes to the db (indexer just maintains local cache)
disableIndexWriter: false

Expand Down
136 changes: 136 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,139 @@ func GetSlotAssignmentsForSlots(firstSlot uint64, lastSlot uint64) []*dbtypes.Sl
}
return assignments
}

func GetBlockOrphanedRefs(blockRoots [][]byte) []*dbtypes.BlockOrphanedRef {
orphanedRefs := []*dbtypes.BlockOrphanedRef{}
if len(blockRoots) == 0 {
return orphanedRefs
}
var sql strings.Builder
fmt.Fprintf(&sql, `
SELECT
root, orphaned
FROM blocks
WHERE root in (`)
argIdx := 0
args := make([]any, len(blockRoots))
for i, root := range blockRoots {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "$%v", argIdx+1)
args[argIdx] = root
argIdx += 1
}
fmt.Fprintf(&sql, ")")
err := ReaderDb.Select(&orphanedRefs, sql.String(), args...)
if err != nil {
logger.Errorf("Error while fetching blocks: %v", err)
return nil
}
return orphanedRefs
}

func InsertUnfinalizedBlock(block *dbtypes.UnfinalizedBlock, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
INSERT INTO unfinalized_blocks (
root, slot, header, block
) VALUES ($1, $2, $3, $4)
ON CONFLICT (root) DO NOTHING`,
dbtypes.DBEngineSqlite: `
INSERT OR IGNORE INTO unfinalized_blocks (
root, slot, header, block
) VALUES ($1, $2, $3, $4)`,
}),
block.Root, block.Slot, block.Header, block.Block)
if err != nil {
return err
}
return nil
}

func GetUnfinalizedBlockHeader() []*dbtypes.UnfinalizedBlockHeader {
blockRefs := []*dbtypes.UnfinalizedBlockHeader{}
err := ReaderDb.Select(&blockRefs, `
SELECT
root, slot, header
FROM unfinalized_blocks
`)
if err != nil {
logger.Errorf("Error while fetching unfinalized block refs: %v", err)
return nil
}
return blockRefs
}

func GetUnfinalizedBlock(root []byte) *dbtypes.UnfinalizedBlock {
block := dbtypes.UnfinalizedBlock{}
err := ReaderDb.Get(&block, `
SELECT root, slot, header, block
FROM unfinalized_blocks
WHERE root = $1
`, root)
if err != nil {
logger.Errorf("Error while fetching unfinalized block 0x%x: %v", root, err)
return nil
}
return &block
}

func InsertUnfinalizedEpochDuty(epochDuty *dbtypes.UnfinalizedEpochDuty, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
INSERT INTO unfinalized_duties (
epoch, dependent_root, duties
) VALUES ($1, $2, $3)
ON CONFLICT (root) DO NOTHING`,
dbtypes.DBEngineSqlite: `
INSERT OR IGNORE INTO unfinalized_duties (
epoch, dependent_root, duties
) VALUES ($1, $2, $3)`,
}),
epochDuty.Epoch, epochDuty.DependentRoot, epochDuty.Duties)
if err != nil {
return err
}
return nil
}

func GetUnfinalizedEpochDutyRefs() []*dbtypes.UnfinalizedEpochDutyRef {
dutyRefs := []*dbtypes.UnfinalizedEpochDutyRef{}
err := ReaderDb.Select(&dutyRefs, `
SELECT
epoch, dependent_root
FROM unfinalized_duties
`)
if err != nil {
logger.Errorf("Error while fetching unfinalized duty refs: %v", err)
return nil
}
return dutyRefs
}

func GetUnfinalizedDuty(epoch uint64, dependentRoot []byte) *dbtypes.UnfinalizedEpochDuty {
epochDuty := dbtypes.UnfinalizedEpochDuty{}
err := ReaderDb.Get(&epochDuty, `
SELECT epoch, dependent_root, duties
FROM unfinalized_duties
WHERE epoch = $1 AND dependent_root = $2
`, epoch, dependentRoot)
if err != nil {
logger.Errorf("Error while fetching unfinalized duty %v/0x%x: %v", epoch, dependentRoot, err)
return nil
}
return &epochDuty
}

func DeleteUnfinalizedBefore(slot uint64, tx *sqlx.Tx) error {
_, err := tx.Exec(`DELETE FROM unfinalized_blocks WHERE slot < $1`, slot)
if err != nil {
return err
}
_, err = tx.Exec(`DELETE FROM unfinalized_duties WHERE epoch < $1`, utils.EpochOfSlot(slot))
if err != nil {
return err
}
return nil
}
2 changes: 1 addition & 1 deletion db/schema/pgsql/20230720234842_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ CREATE INDEX IF NOT EXISTS "blocks_graffiti_idx"

CREATE INDEX IF NOT EXISTS "blocks_slot_idx"
ON public."blocks"
("root" ASC NULLS LAST);
("slot" ASC NULLS LAST);

CREATE INDEX IF NOT EXISTS "blocks_state_root_idx"
ON public."blocks"
Expand Down
29 changes: 29 additions & 0 deletions db/schema/pgsql/20230820050910_indexer-cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS public."unfinalized_blocks"
(
"root" bytea NOT NULL,
"slot" bigint NOT NULL,
"header" text COLLATE pg_catalog."default" NOT NULL,
"block" text COLLATE pg_catalog."default" NOT NULL,
CONSTRAINT "unfinalized_blocks_pkey" PRIMARY KEY ("root")
);

CREATE INDEX IF NOT EXISTS "unfinalized_blocks_slot_idx"
ON public."unfinalized_blocks"
("slot" ASC NULLS LAST);

CREATE TABLE IF NOT EXISTS public."unfinalized_duties"
(
"epoch" bigint NOT NULL,
"dependent_root" bytea NOT NULL,
"duties" bytea NOT NULL,
CONSTRAINT "unfinalized_duties_pkey" PRIMARY KEY ("epoch", "dependent_root")
);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
2 changes: 1 addition & 1 deletion db/schema/sqlite/20230720234842_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ CREATE INDEX IF NOT EXISTS "blocks_graffiti_idx"

CREATE INDEX IF NOT EXISTS "blocks_slot_idx"
ON "blocks"
("root" ASC);
("slot" ASC);

CREATE INDEX IF NOT EXISTS "blocks_state_root_idx"
ON "blocks"
Expand Down
29 changes: 29 additions & 0 deletions db/schema/sqlite/20230820050910_indexer-cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS "unfinalized_blocks"
(
"root" BLOB NOT NULL,
"slot" bigint NOT NULL,
"header" text NOT NULL,
"block" text NOT NULL,
CONSTRAINT "unfinalized_blocks_pkey" PRIMARY KEY ("root")
);

CREATE INDEX IF NOT EXISTS "unfinalized_blocks_slot_idx"
ON "unfinalized_blocks"
("slot" ASC);

CREATE TABLE IF NOT EXISTS "unfinalized_duties"
(
"epoch" bigint NOT NULL,
"dependent_root" BLOB NOT NULL,
"duties" BLOB NOT NULL,
CONSTRAINT "unfinalized_duties_pkey" PRIMARY KEY ("epoch", "dependent_root")
);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
29 changes: 29 additions & 0 deletions dbtypes/dbtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type Block struct {
SyncParticipation float32 `db:"sync_participation"`
}

type BlockOrphanedRef struct {
Root []byte `db:"root"`
Orphaned bool `db:"orphaned"`
}

type Epoch struct {
Epoch uint64 `db:"epoch"`
ValidatorCount uint64 `db:"validator_count"`
Expand Down Expand Up @@ -60,3 +65,27 @@ type SlotAssignment struct {
Slot uint64 `db:"slot"`
Proposer uint64 `db:"proposer"`
}

type UnfinalizedBlock struct {
Root []byte `db:"root"`
Slot uint64 `db:"slot"`
Header string `db:"header"`
Block string `db:"block"`
}

type UnfinalizedBlockHeader struct {
Root []byte `db:"root"`
Slot uint64 `db:"slot"`
Header string `db:"header"`
}

type UnfinalizedEpochDuty struct {
Epoch uint64 `db:"epoch"`
DependentRoot []byte `db:"dependent_root"`
Duties []byte `db:"duties"`
}

type UnfinalizedEpochDutyRef struct {
Epoch uint64 `db:"epoch"`
DependentRoot []byte `db:"dependent_root"`
}
7 changes: 2 additions & 5 deletions handlers/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func buildEpochPageData(epoch uint64) (*models.EpochPageData, time.Duration) {
return nil, -1
}

finalizedHead, _ := services.GlobalBeaconService.GetFinalizedBlockHead()
finalizedEpoch, _ := services.GlobalBeaconService.GetFinalizedEpoch()
slotAssignments, syncedEpochs := services.GlobalBeaconService.GetProposerAssignments(epoch, epoch)

nextEpoch := epoch + 1
Expand All @@ -90,10 +90,7 @@ func buildEpochPageData(epoch uint64) (*models.EpochPageData, time.Duration) {
NextEpoch: nextEpoch,
Ts: utils.EpochToTime(epoch),
Synchronized: syncedEpochs[epoch],
}

if finalizedHead != nil {
pageData.Finalized = uint64(finalizedHead.Data.Header.Message.Slot) >= lastSlot
Finalized: finalizedEpoch >= int64(epoch),
}

dbEpochs := services.GlobalBeaconService.GetDbEpochs(epoch, 1)
Expand Down
8 changes: 3 additions & 5 deletions handlers/epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func buildEpochsPageData(firstEpoch uint64, pageSize uint64) (*models.EpochsPage
}
pageData.LastPageEpoch = pageSize - 1

finalizedHead, _ := services.GlobalBeaconService.GetFinalizedBlockHead()
finalizedEpoch, _ := services.GlobalBeaconService.GetFinalizedEpoch()
epochLimit := pageSize

// load epochs
Expand All @@ -102,10 +102,8 @@ func buildEpochsPageData(firstEpoch uint64, pageSize uint64) (*models.EpochsPage
allFinalized := true
for epochIdx := int64(firstEpoch); epochIdx >= 0 && epochCount < epochLimit; epochIdx-- {
epoch := uint64(epochIdx)
finalized := false
if finalizedHead != nil && uint64(finalizedHead.Data.Header.Message.Slot) >= epoch*utils.Config.Chain.Config.SlotsPerEpoch {
finalized = true
} else {
finalized := finalizedEpoch >= epochIdx
if !finalized {
allFinalized = false
}
epochData := &models.EpochsPageDataEpoch{
Expand Down
25 changes: 6 additions & 19 deletions handlers/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Index(w http.ResponseWriter, r *http.Request) {

func getIndexPageData() *models.IndexPageData {
pageData := &models.IndexPageData{}
pageCacheKey := fmt.Sprintf("index")
pageCacheKey := "index"
pageData = services.GlobalFrontendCache.ProcessCachedPage(pageCacheKey, true, pageData, func(pageCall *services.FrontendCacheProcessingPage) interface{} {
pageData, cacheTimeout := buildIndexPageData()
pageCall.CacheTimeout = cacheTimeout
Expand All @@ -62,24 +62,15 @@ func buildIndexPageData() (*models.IndexPageData, time.Duration) {
currentEpoch = 0
}
currentSlot := utils.TimeToSlot(uint64(now.Unix()))
if currentSlot < 0 {
currentSlot = 0
}
currentSlotIndex := (currentSlot % utils.Config.Chain.Config.SlotsPerEpoch) + 1

finalizedHead, _ := services.GlobalBeaconService.GetFinalizedBlockHead()
var finalizedSlot uint64
if finalizedHead != nil {
finalizedSlot = uint64(finalizedHead.Data.Header.Message.Slot)
} else {
finalizedSlot = 0
}
finalizedEpoch, _ := services.GlobalBeaconService.GetFinalizedEpoch()

syncState := dbtypes.IndexerSyncState{}
db.GetExplorerState("indexer.syncstate", &syncState)
var isSynced bool
if currentEpoch > int64(utils.Config.Indexer.EpochProcessingDelay) {
isSynced = syncState.Epoch >= uint64(currentEpoch-int64(utils.Config.Indexer.EpochProcessingDelay))
if finalizedEpoch >= 1 {
isSynced = syncState.Epoch >= uint64(finalizedEpoch-1)
} else {
isSynced = true
}
Expand All @@ -89,7 +80,7 @@ func buildIndexPageData() (*models.IndexPageData, time.Duration) {
DepositContract: utils.Config.Chain.Config.DepositContractAddress,
ShowSyncingMessage: !isSynced,
CurrentEpoch: uint64(currentEpoch),
CurrentFinalizedEpoch: utils.EpochOfSlot(finalizedSlot),
CurrentFinalizedEpoch: finalizedEpoch,
CurrentSlot: currentSlot,
CurrentSlotIndex: currentSlotIndex,
CurrentScheduledCount: utils.Config.Chain.Config.SlotsPerEpoch - currentSlotIndex,
Expand Down Expand Up @@ -176,18 +167,14 @@ func buildIndexPageData() (*models.IndexPageData, time.Duration) {
if epochData == nil {
continue
}
finalized := false
if finalizedHead != nil && uint64(finalizedHead.Data.Header.Message.Slot) >= epochData.Epoch*utils.Config.Chain.Config.SlotsPerEpoch {
finalized = true
}
voteParticipation := float64(1)
if epochData.Eligible > 0 {
voteParticipation = float64(epochData.VotedTarget) * 100.0 / float64(epochData.Eligible)
}
pageData.RecentEpochs = append(pageData.RecentEpochs, &models.IndexPageDataEpochs{
Epoch: epochData.Epoch,
Ts: utils.EpochToTime(epochData.Epoch),
Finalized: finalized,
Finalized: finalizedEpoch >= int64(epochData.Epoch),
EligibleEther: epochData.Eligible,
TargetVoted: epochData.VotedTarget,
HeadVoted: epochData.VotedHead,
Expand Down
4 changes: 2 additions & 2 deletions handlers/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
if cachedBlock != nil {
result = &[]models.SearchAheadSlotsResult{
{
Slot: fmt.Sprintf("%v", uint64(cachedBlock.Header.Data.Header.Message.Slot)),
Root: cachedBlock.Header.Data.Root,
Slot: fmt.Sprintf("%v", uint64(cachedBlock.Header.Message.Slot)),
Root: cachedBlock.Root,
Orphaned: cachedBlock.Orphaned,
},
}
Expand Down
Loading

0 comments on commit a2e66c8

Please sign in to comment.