Skip to content

Commit

Permalink
added ability to search for execution layer block hashes / numbers
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 30, 2023
1 parent 80e44ed commit 0ca67fc
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 80 deletions.
8 changes: 4 additions & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,15 +641,15 @@ func InsertUnfinalizedBlock(block *dbtypes.UnfinalizedBlock, tx *sqlx.Tx) error
return nil
}

func GetUnfinalizedBlockHeader() []*dbtypes.UnfinalizedBlockHeader {
blockRefs := []*dbtypes.UnfinalizedBlockHeader{}
func GetUnfinalizedBlocks() []*dbtypes.UnfinalizedBlock {
blockRefs := []*dbtypes.UnfinalizedBlock{}
err := ReaderDb.Select(&blockRefs, `
SELECT
root, slot, header
root, slot, header, block
FROM unfinalized_blocks
`)
if err != nil {
logger.Errorf("Error while fetching unfinalized block refs: %v", err)
logger.Errorf("Error while fetching unfinalized blocks: %v", err)
return nil
}
return blockRefs
Expand Down
6 changes: 0 additions & 6 deletions dbtypes/dbtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ type UnfinalizedBlock struct {
Block string `db:"block"`
}

type UnfinalizedBlockHeader struct {
Root []byte `db:"root"`
Slot uint64 `db:"slot"`
Header string `db:"header"`
}

type UnfinalizedEpochDuty struct {
Epoch uint64 `db:"epoch"`
DependentRoot []byte `db:"dependent_root"`
Expand Down
8 changes: 8 additions & 0 deletions dbtypes/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ type SearchAheadSlotsResult []struct {
Orphaned bool `db:"orphaned"`
}

type SearchAheadExecBlocksResult []struct {
Slot uint64 `db:"slot"`
Root []byte `db:"root"`
ExecHash []byte `db:"eth_block_hash"`
ExecNumber uint64 `db:"eth_block_number"`
Orphaned bool `db:"orphaned"`
}

type SearchAheadGraffitiResult []struct {
Graffiti string `db:"graffiti"`
Count uint64 `db:"count"`
Expand Down
198 changes: 160 additions & 38 deletions handlers/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,15 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
searchType := vars["type"]
urlArgs := r.URL.Query()
search := urlArgs.Get("q")
search = strings.Trim(search, " \t")
search = strings.Replace(search, "0x", "", -1)
search = strings.Replace(search, "0X", "", -1)
var err error
logger := logrus.WithField("searchType", searchType)
var result interface{}

indexer := services.GlobalBeaconService.GetIndexer()

switch searchType {
case "epochs":
dbres := &dbtypes.SearchAheadEpochsResult{}
Expand All @@ -124,61 +127,83 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
result = model
}
case "slots":
if len(search) <= 1 {
if len(search) == 0 {
break
}
if !searchLikeRE.MatchString(search) {
break
}
if searchLikeRE.MatchString(search) {
if len(search) == 64 {
blockHash, err := hex.DecodeString(search)
if len(search) == 64 {
blockHash, err := hex.DecodeString(search)
if err != nil {
logger.Errorf("error parsing blockHash to int: %v", err)
http.Error(w, "Internal server error", http.StatusServiceUnavailable)
return
}

cachedBlock := indexer.GetCachedBlock(blockHash)
if cachedBlock == nil {
cachedBlock = indexer.GetCachedBlockByStateroot(blockHash)
}
if !cachedBlock.IsReady() {
cachedBlock = nil
}
if cachedBlock != nil {
header := cachedBlock.GetHeader()
result = &[]models.SearchAheadSlotsResult{
{
Slot: fmt.Sprintf("%v", uint64(header.Message.Slot)),
Root: cachedBlock.Root,
Orphaned: !cachedBlock.IsCanonical(indexer, nil),
},
}
} else {
dbres := &dbtypes.SearchAheadSlotsResult{}
err = db.ReaderDb.Select(dbres, `
SELECT slot, root, orphaned
FROM blocks
WHERE root = $1 OR
state_root = $1
ORDER BY slot LIMIT 1`, blockHash)
if err != nil {
logger.Errorf("error parsing blockHash to int: %v", err)
logger.Errorf("error reading block root: %v", err)
http.Error(w, "Internal server error", http.StatusServiceUnavailable)
return
}

cachedBlock := services.GlobalBeaconService.GetCachedBlockByBlockroot(blockHash)
if cachedBlock == nil {
cachedBlock = services.GlobalBeaconService.GetCachedBlockByStateroot(blockHash)
}
if cachedBlock != nil {
if len(*dbres) > 0 {
result = &[]models.SearchAheadSlotsResult{
{
Slot: fmt.Sprintf("%v", uint64(cachedBlock.Header.Message.Slot)),
Root: cachedBlock.Root,
Orphaned: cachedBlock.Orphaned,
Slot: fmt.Sprintf("%v", (*dbres)[0].Slot),
Root: (*dbres)[0].Root,
Orphaned: (*dbres)[0].Orphaned,
},
}
} else {
dbres := &dbtypes.SearchAheadSlotsResult{}
err = db.ReaderDb.Select(dbres, `
SELECT slot, root, orphaned
FROM blocks
WHERE root = $1 OR
state_root = $1
ORDER BY slot LIMIT 1`, blockHash)
if err != nil {
logger.Errorf("error reading block root: %v", err)
http.Error(w, "Internal server error", http.StatusServiceUnavailable)
return
}
if len(*dbres) > 0 {
result = &[]models.SearchAheadSlotsResult{
{
Slot: fmt.Sprintf("%v", (*dbres)[0].Slot),
Root: (*dbres)[0].Root,
Orphaned: (*dbres)[0].Orphaned,
},
}
}
}

}
} else if blockNumber, convertErr := strconv.ParseUint(search, 10, 32); convertErr == nil {
cachedBlocks := indexer.GetCachedBlocks(blockNumber)
if len(cachedBlocks) > 0 {
res := make([]*models.SearchAheadSlotsResult, 0)
for _, cachedBlock := range cachedBlocks {
if !cachedBlock.IsReady() {
continue
}
header := cachedBlock.GetHeader()
res = append(res, &models.SearchAheadSlotsResult{
Slot: fmt.Sprintf("%v", uint64(header.Message.Slot)),
Root: cachedBlock.Root,
Orphaned: !cachedBlock.IsCanonical(indexer, nil),
})
}
} else if _, convertErr := strconv.ParseInt(search, 10, 32); convertErr == nil {
result = res
} else {
dbres := &dbtypes.SearchAheadSlotsResult{}
err = db.ReaderDb.Select(dbres, `
SELECT slot, root, orphaned
FROM blocks
WHERE slot = $1
ORDER BY slot LIMIT 10`, search)
ORDER BY slot LIMIT 10`, blockNumber)
if err == nil {
model := make([]models.SearchAheadSlotsResult, len(*dbres))
for idx, entry := range *dbres {
Expand All @@ -192,6 +217,103 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
}
}
}
case "execblocks":
if len(search) == 0 {
break
}
if !searchLikeRE.MatchString(search) {
break
}
if len(search) == 64 {
blockHash, err := hex.DecodeString(search)
if err != nil {
logger.Errorf("error parsing blockHash to int: %v", err)
http.Error(w, "Internal server error", http.StatusServiceUnavailable)
return
}

cachedBlocks := indexer.GetCachedBlocksByExecutionBlockHash(blockHash)
if len(cachedBlocks) > 0 {
res := make([]*models.SearchAheadExecBlocksResult, 0)
for idx, cachedBlock := range cachedBlocks {
if !cachedBlock.IsReady() {
continue
}
header := cachedBlock.GetHeader()
res[idx] = &models.SearchAheadExecBlocksResult{
Slot: fmt.Sprintf("%v", uint64(header.Message.Slot)),
Root: cachedBlock.Root,
ExecHash: cachedBlock.Refs.ExecutionHash,
ExecNumber: cachedBlock.Refs.ExecutionNumber,
Orphaned: !cachedBlock.IsCanonical(indexer, nil),
}
}
result = res
} else {
dbres := &dbtypes.SearchAheadExecBlocksResult{}
err = db.ReaderDb.Select(dbres, `
SELECT slot, root, eth_block_hash, eth_block_number, orphaned
FROM blocks
WHERE eth_block_hash = $1
ORDER BY slot LIMIT 10`, blockHash)
if err != nil {
logger.Errorf("error reading block: %v", err)
http.Error(w, "Internal server error", http.StatusServiceUnavailable)
return
}
if len(*dbres) > 0 {
result = &[]models.SearchAheadExecBlocksResult{
{
Slot: fmt.Sprintf("%v", (*dbres)[0].Slot),
Root: (*dbres)[0].Root,
ExecHash: (*dbres)[0].ExecHash,
ExecNumber: (*dbres)[0].ExecNumber,
Orphaned: (*dbres)[0].Orphaned,
},
}
}

}
} else if blockNumber, convertErr := strconv.ParseUint(search, 10, 32); convertErr == nil {
cachedBlocks := indexer.GetCachedBlocksByExecutionBlockNumber(blockNumber)
if len(cachedBlocks) > 0 {
res := make([]*models.SearchAheadExecBlocksResult, 0)
for _, cachedBlock := range cachedBlocks {
if !cachedBlock.IsReady() {
continue
}
header := cachedBlock.GetHeader()
res = append(res, &models.SearchAheadExecBlocksResult{
Slot: fmt.Sprintf("%v", uint64(header.Message.Slot)),
Root: cachedBlock.Root,
ExecHash: cachedBlock.Refs.ExecutionHash,
ExecNumber: cachedBlock.Refs.ExecutionNumber,
Orphaned: !cachedBlock.IsCanonical(indexer, nil),
})
}
result = res
} else {
dbres := &dbtypes.SearchAheadExecBlocksResult{}
err = db.ReaderDb.Select(dbres, `
SELECT slot, root, eth_block_hash, eth_block_number, orphaned
FROM blocks
WHERE eth_block_number = $1
ORDER BY slot LIMIT 10`, blockNumber)
if err == nil {
model := make([]models.SearchAheadExecBlocksResult, len(*dbres))
for idx, entry := range *dbres {
model[idx] = models.SearchAheadExecBlocksResult{
Slot: fmt.Sprintf("%v", entry.Slot),
Root: entry.Root,
ExecHash: entry.ExecHash,
ExecNumber: entry.ExecNumber,
Orphaned: entry.Orphaned,
}
}
result = model
}
}
}
case "graffiti":
graffiti := &dbtypes.SearchAheadGraffitiResult{}
err = db.ReaderDb.Select(graffiti, db.EngineQuery(map[dbtypes.DBEngineType]string{
Expand Down
18 changes: 13 additions & 5 deletions indexer/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,27 @@ func (cache *indexerCache) setLastValidators(epoch uint64, validators *rpctypes.
}

func (cache *indexerCache) loadStoredUnfinalizedCache() error {
blockHeaders := db.GetUnfinalizedBlockHeader()
for _, blockHeader := range blockHeaders {
blocks := db.GetUnfinalizedBlocks()
for _, block := range blocks {
var header rpctypes.SignedBeaconBlockHeader
err := json.Unmarshal([]byte(blockHeader.Header), &header)
err := json.Unmarshal([]byte(block.Header), &header)
if err != nil {
logger.Warnf("Error parsing unfinalized block header from db: %v", err)
continue
}
logger.Debugf("Restored unfinalized block header from db: %v", blockHeader.Slot)
cachedBlock, _ := cache.createOrGetCachedBlock(blockHeader.Root, blockHeader.Slot)
var body rpctypes.SignedBeaconBlock
err = json.Unmarshal([]byte(block.Block), &body)
if err != nil {
logger.Warnf("Error parsing unfinalized block body from db: %v", err)
continue
}
logger.Debugf("Restored unfinalized block header from db: %v", block.Slot)
cachedBlock, _ := cache.createOrGetCachedBlock(block.Root, block.Slot)
cachedBlock.mutex.Lock()
cachedBlock.header = &header
cachedBlock.block = &body
cachedBlock.isInDb = true
cachedBlock.parseBlockRefs()
cachedBlock.mutex.Unlock()
}
epochDuties := db.GetUnfinalizedEpochDutyRefs()
Expand Down
16 changes: 16 additions & 0 deletions indexer/cache_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type CacheBlock struct {
isInDb bool
header *rpctypes.SignedBeaconBlockHeader
block *rpctypes.SignedBeaconBlock
Refs struct {
ExecutionHash []byte
ExecutionNumber uint64
}

dbBlockMutex sync.Mutex
dbBlockCache *dbtypes.Block
Expand Down Expand Up @@ -100,6 +104,17 @@ func (block *CacheBlock) buildOrphanedBlock() *dbtypes.OrphanedBlock {
}
}

func (block *CacheBlock) parseBlockRefs() {
if block.block == nil {
return
}
execPayload := block.block.Message.Body.ExecutionPayload
if execPayload != nil {
block.Refs.ExecutionHash = execPayload.BlockHash
block.Refs.ExecutionNumber = uint64(execPayload.BlockNumber)
}
}

func (block *CacheBlock) GetParentRoot() []byte {
block.mutex.RLock()
defer block.mutex.RUnlock()
Expand Down Expand Up @@ -134,6 +149,7 @@ func (block *CacheBlock) GetBlockBody() *rpctypes.SignedBeaconBlock {
return nil
}
block.block = &blockBody
block.parseBlockRefs()

return block.block
}
Expand Down
Loading

0 comments on commit 0ca67fc

Please sign in to comment.