Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement indexer with multi-endpoint support and reliable fork tracking #8

Merged
merged 18 commits into from
Aug 23, 2023
6 changes: 0 additions & 6 deletions config/default.config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,9 @@ beaconapi:

# indexer keeps track of the latest epochs in memory.
indexer:
# number of epochs to load on startup
prepopulateEpochs: 2

# max number of epochs to keep in memory
inMemoryEpochs: 3

# epoch processing delay (should be >= 2)
epochProcessingDelay: 2

# disable synchronizing and everything that writes to the db (indexer just maintains local cache)
disableIndexWriter: false

Expand Down
136 changes: 136 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,139 @@ func GetSlotAssignmentsForSlots(firstSlot uint64, lastSlot uint64) []*dbtypes.Sl
}
return assignments
}

func GetBlockOrphanedRefs(blockRoots [][]byte) []*dbtypes.BlockOrphanedRef {
orphanedRefs := []*dbtypes.BlockOrphanedRef{}
if len(blockRoots) == 0 {
return orphanedRefs
}
var sql strings.Builder
fmt.Fprintf(&sql, `
SELECT
root, orphaned
FROM blocks
WHERE root in (`)
argIdx := 0
args := make([]any, len(blockRoots))
for i, root := range blockRoots {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "$%v", argIdx+1)
args[argIdx] = root
argIdx += 1
}
fmt.Fprintf(&sql, ")")
err := ReaderDb.Select(&orphanedRefs, sql.String(), args...)
if err != nil {
logger.Errorf("Error while fetching blocks: %v", err)
return nil
}
return orphanedRefs
}

func InsertUnfinalizedBlock(block *dbtypes.UnfinalizedBlock, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
INSERT INTO unfinalized_blocks (
root, slot, header, block
) VALUES ($1, $2, $3, $4)
ON CONFLICT (root) DO NOTHING`,
dbtypes.DBEngineSqlite: `
INSERT OR IGNORE INTO unfinalized_blocks (
root, slot, header, block
) VALUES ($1, $2, $3, $4)`,
}),
block.Root, block.Slot, block.Header, block.Block)
if err != nil {
return err
}
return nil
}

func GetUnfinalizedBlockHeader() []*dbtypes.UnfinalizedBlockHeader {
blockRefs := []*dbtypes.UnfinalizedBlockHeader{}
err := ReaderDb.Select(&blockRefs, `
SELECT
root, slot, header
FROM unfinalized_blocks
`)
if err != nil {
logger.Errorf("Error while fetching unfinalized block refs: %v", err)
return nil
}
return blockRefs
}

func GetUnfinalizedBlock(root []byte) *dbtypes.UnfinalizedBlock {
block := dbtypes.UnfinalizedBlock{}
err := ReaderDb.Get(&block, `
SELECT root, slot, header, block
FROM unfinalized_blocks
WHERE root = $1
`, root)
if err != nil {
logger.Errorf("Error while fetching unfinalized block 0x%x: %v", root, err)
return nil
}
return &block
}

func InsertUnfinalizedEpochDuty(epochDuty *dbtypes.UnfinalizedEpochDuty, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
INSERT INTO unfinalized_duties (
epoch, dependent_root, duties
) VALUES ($1, $2, $3)
ON CONFLICT (root) DO NOTHING`,
dbtypes.DBEngineSqlite: `
INSERT OR IGNORE INTO unfinalized_duties (
epoch, dependent_root, duties
) VALUES ($1, $2, $3)`,
}),
epochDuty.Epoch, epochDuty.DependentRoot, epochDuty.Duties)
if err != nil {
return err
}
return nil
}

func GetUnfinalizedEpochDutyRefs() []*dbtypes.UnfinalizedEpochDutyRef {
dutyRefs := []*dbtypes.UnfinalizedEpochDutyRef{}
err := ReaderDb.Select(&dutyRefs, `
SELECT
epoch, dependent_root
FROM unfinalized_duties
`)
if err != nil {
logger.Errorf("Error while fetching unfinalized duty refs: %v", err)
return nil
}
return dutyRefs
}

func GetUnfinalizedDuty(epoch uint64, dependentRoot []byte) *dbtypes.UnfinalizedEpochDuty {
epochDuty := dbtypes.UnfinalizedEpochDuty{}
err := ReaderDb.Get(&epochDuty, `
SELECT epoch, dependent_root, duties
FROM unfinalized_duties
WHERE epoch = $1 AND dependent_root = $2
`, epoch, dependentRoot)
if err != nil {
logger.Errorf("Error while fetching unfinalized duty %v/0x%x: %v", epoch, dependentRoot, err)
return nil
}
return &epochDuty
}

func DeleteUnfinalizedBefore(slot uint64, tx *sqlx.Tx) error {
_, err := tx.Exec(`DELETE FROM unfinalized_blocks WHERE slot < $1`, slot)
if err != nil {
return err
}
_, err = tx.Exec(`DELETE FROM unfinalized_duties WHERE epoch < $1`, utils.EpochOfSlot(slot))
if err != nil {
return err
}
return nil
}
2 changes: 1 addition & 1 deletion db/schema/pgsql/20230720234842_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ CREATE INDEX IF NOT EXISTS "blocks_graffiti_idx"

CREATE INDEX IF NOT EXISTS "blocks_slot_idx"
ON public."blocks"
("root" ASC NULLS LAST);
("slot" ASC NULLS LAST);

CREATE INDEX IF NOT EXISTS "blocks_state_root_idx"
ON public."blocks"
Expand Down
29 changes: 29 additions & 0 deletions db/schema/pgsql/20230820050910_indexer-cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS public."unfinalized_blocks"
(
"root" bytea NOT NULL,
"slot" bigint NOT NULL,
"header" text COLLATE pg_catalog."default" NOT NULL,
"block" text COLLATE pg_catalog."default" NOT NULL,
CONSTRAINT "unfinalized_blocks_pkey" PRIMARY KEY ("root")
);

CREATE INDEX IF NOT EXISTS "unfinalized_blocks_slot_idx"
ON public."unfinalized_blocks"
("slot" ASC NULLS LAST);

CREATE TABLE IF NOT EXISTS public."unfinalized_duties"
(
"epoch" bigint NOT NULL,
"dependent_root" bytea NOT NULL,
"duties" bytea NOT NULL,
CONSTRAINT "unfinalized_duties_pkey" PRIMARY KEY ("epoch", "dependent_root")
);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
2 changes: 1 addition & 1 deletion db/schema/sqlite/20230720234842_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ CREATE INDEX IF NOT EXISTS "blocks_graffiti_idx"

CREATE INDEX IF NOT EXISTS "blocks_slot_idx"
ON "blocks"
("root" ASC);
("slot" ASC);

CREATE INDEX IF NOT EXISTS "blocks_state_root_idx"
ON "blocks"
Expand Down
29 changes: 29 additions & 0 deletions db/schema/sqlite/20230820050910_indexer-cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS "unfinalized_blocks"
(
"root" BLOB NOT NULL,
"slot" bigint NOT NULL,
"header" text NOT NULL,
"block" text NOT NULL,
CONSTRAINT "unfinalized_blocks_pkey" PRIMARY KEY ("root")
);

CREATE INDEX IF NOT EXISTS "unfinalized_blocks_slot_idx"
ON "unfinalized_blocks"
("slot" ASC);

CREATE TABLE IF NOT EXISTS "unfinalized_duties"
(
"epoch" bigint NOT NULL,
"dependent_root" BLOB NOT NULL,
"duties" BLOB NOT NULL,
CONSTRAINT "unfinalized_duties_pkey" PRIMARY KEY ("epoch", "dependent_root")
);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
29 changes: 29 additions & 0 deletions dbtypes/dbtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type Block struct {
SyncParticipation float32 `db:"sync_participation"`
}

type BlockOrphanedRef struct {
Root []byte `db:"root"`
Orphaned bool `db:"orphaned"`
}

type Epoch struct {
Epoch uint64 `db:"epoch"`
ValidatorCount uint64 `db:"validator_count"`
Expand Down Expand Up @@ -60,3 +65,27 @@ type SlotAssignment struct {
Slot uint64 `db:"slot"`
Proposer uint64 `db:"proposer"`
}

type UnfinalizedBlock struct {
Root []byte `db:"root"`
Slot uint64 `db:"slot"`
Header string `db:"header"`
Block string `db:"block"`
}

type UnfinalizedBlockHeader struct {
Root []byte `db:"root"`
Slot uint64 `db:"slot"`
Header string `db:"header"`
}

type UnfinalizedEpochDuty struct {
Epoch uint64 `db:"epoch"`
DependentRoot []byte `db:"dependent_root"`
Duties []byte `db:"duties"`
}

type UnfinalizedEpochDutyRef struct {
Epoch uint64 `db:"epoch"`
DependentRoot []byte `db:"dependent_root"`
}
7 changes: 2 additions & 5 deletions handlers/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func buildEpochPageData(epoch uint64) (*models.EpochPageData, time.Duration) {
return nil, -1
}

finalizedHead, _ := services.GlobalBeaconService.GetFinalizedBlockHead()
finalizedEpoch, _ := services.GlobalBeaconService.GetFinalizedEpoch()
slotAssignments, syncedEpochs := services.GlobalBeaconService.GetProposerAssignments(epoch, epoch)

nextEpoch := epoch + 1
Expand All @@ -90,10 +90,7 @@ func buildEpochPageData(epoch uint64) (*models.EpochPageData, time.Duration) {
NextEpoch: nextEpoch,
Ts: utils.EpochToTime(epoch),
Synchronized: syncedEpochs[epoch],
}

if finalizedHead != nil {
pageData.Finalized = uint64(finalizedHead.Data.Header.Message.Slot) >= lastSlot
Finalized: finalizedEpoch >= int64(epoch),
}

dbEpochs := services.GlobalBeaconService.GetDbEpochs(epoch, 1)
Expand Down
8 changes: 3 additions & 5 deletions handlers/epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func buildEpochsPageData(firstEpoch uint64, pageSize uint64) (*models.EpochsPage
}
pageData.LastPageEpoch = pageSize - 1

finalizedHead, _ := services.GlobalBeaconService.GetFinalizedBlockHead()
finalizedEpoch, _ := services.GlobalBeaconService.GetFinalizedEpoch()
epochLimit := pageSize

// load epochs
Expand All @@ -102,10 +102,8 @@ func buildEpochsPageData(firstEpoch uint64, pageSize uint64) (*models.EpochsPage
allFinalized := true
for epochIdx := int64(firstEpoch); epochIdx >= 0 && epochCount < epochLimit; epochIdx-- {
epoch := uint64(epochIdx)
finalized := false
if finalizedHead != nil && uint64(finalizedHead.Data.Header.Message.Slot) >= epoch*utils.Config.Chain.Config.SlotsPerEpoch {
finalized = true
} else {
finalized := finalizedEpoch >= epochIdx
if !finalized {
allFinalized = false
}
epochData := &models.EpochsPageDataEpoch{
Expand Down
25 changes: 6 additions & 19 deletions handlers/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Index(w http.ResponseWriter, r *http.Request) {

func getIndexPageData() *models.IndexPageData {
pageData := &models.IndexPageData{}
pageCacheKey := fmt.Sprintf("index")
pageCacheKey := "index"
pageData = services.GlobalFrontendCache.ProcessCachedPage(pageCacheKey, true, pageData, func(pageCall *services.FrontendCacheProcessingPage) interface{} {
pageData, cacheTimeout := buildIndexPageData()
pageCall.CacheTimeout = cacheTimeout
Expand All @@ -62,24 +62,15 @@ func buildIndexPageData() (*models.IndexPageData, time.Duration) {
currentEpoch = 0
}
currentSlot := utils.TimeToSlot(uint64(now.Unix()))
if currentSlot < 0 {
currentSlot = 0
}
currentSlotIndex := (currentSlot % utils.Config.Chain.Config.SlotsPerEpoch) + 1

finalizedHead, _ := services.GlobalBeaconService.GetFinalizedBlockHead()
var finalizedSlot uint64
if finalizedHead != nil {
finalizedSlot = uint64(finalizedHead.Data.Header.Message.Slot)
} else {
finalizedSlot = 0
}
finalizedEpoch, _ := services.GlobalBeaconService.GetFinalizedEpoch()

syncState := dbtypes.IndexerSyncState{}
db.GetExplorerState("indexer.syncstate", &syncState)
var isSynced bool
if currentEpoch > int64(utils.Config.Indexer.EpochProcessingDelay) {
isSynced = syncState.Epoch >= uint64(currentEpoch-int64(utils.Config.Indexer.EpochProcessingDelay))
if finalizedEpoch >= 1 {
isSynced = syncState.Epoch >= uint64(finalizedEpoch-1)
} else {
isSynced = true
}
Expand All @@ -89,7 +80,7 @@ func buildIndexPageData() (*models.IndexPageData, time.Duration) {
DepositContract: utils.Config.Chain.Config.DepositContractAddress,
ShowSyncingMessage: !isSynced,
CurrentEpoch: uint64(currentEpoch),
CurrentFinalizedEpoch: utils.EpochOfSlot(finalizedSlot),
CurrentFinalizedEpoch: finalizedEpoch,
CurrentSlot: currentSlot,
CurrentSlotIndex: currentSlotIndex,
CurrentScheduledCount: utils.Config.Chain.Config.SlotsPerEpoch - currentSlotIndex,
Expand Down Expand Up @@ -176,18 +167,14 @@ func buildIndexPageData() (*models.IndexPageData, time.Duration) {
if epochData == nil {
continue
}
finalized := false
if finalizedHead != nil && uint64(finalizedHead.Data.Header.Message.Slot) >= epochData.Epoch*utils.Config.Chain.Config.SlotsPerEpoch {
finalized = true
}
voteParticipation := float64(1)
if epochData.Eligible > 0 {
voteParticipation = float64(epochData.VotedTarget) * 100.0 / float64(epochData.Eligible)
}
pageData.RecentEpochs = append(pageData.RecentEpochs, &models.IndexPageDataEpochs{
Epoch: epochData.Epoch,
Ts: utils.EpochToTime(epochData.Epoch),
Finalized: finalized,
Finalized: finalizedEpoch >= int64(epochData.Epoch),
EligibleEther: epochData.Eligible,
TargetVoted: epochData.VotedTarget,
HeadVoted: epochData.VotedHead,
Expand Down
4 changes: 2 additions & 2 deletions handlers/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
if cachedBlock != nil {
result = &[]models.SearchAheadSlotsResult{
{
Slot: fmt.Sprintf("%v", uint64(cachedBlock.Header.Data.Header.Message.Slot)),
Root: cachedBlock.Header.Data.Root,
Slot: fmt.Sprintf("%v", uint64(cachedBlock.Header.Message.Slot)),
Root: cachedBlock.Root,
Orphaned: cachedBlock.Orphaned,
},
}
Expand Down
Loading