From 47a6ac16da6de62e589b655bde7936dcdb0eb073 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 4 Dec 2023 17:13:52 -0600 Subject: [PATCH] [beacon handler] framework (#8851) adds a two indexes to the validators cache creates beaconhttp package with many utilities for beacon http endpoint (future support for ssz is baked in) started on some validator endpoints --- cl/beacon/beaconhttp/api.go | 105 +++++++ cl/beacon/handler/blocks.go | 81 +++--- cl/beacon/handler/config.go | 12 +- cl/beacon/handler/duties_proposer.go | 11 +- cl/beacon/handler/format.go | 130 ++++----- cl/beacon/handler/genesis.go | 9 +- cl/beacon/handler/handler.go | 83 +++--- cl/beacon/handler/headers.go | 41 +-- cl/beacon/handler/pool.go | 20 +- cl/beacon/handler/states.go | 51 ++-- cl/beacon/middleware.go | 16 -- cl/beacon/router.go | 57 +++- cl/beacon/rw.go | 42 +++ cl/beacon/validatorapi/endpoints.go | 249 +++++++++++++++++ cl/beacon/validatorapi/handler.go | 89 ++++++ .../forkchoice/fork_graph/fork_graph_disk.go | 258 ++++++++---------- .../fork_graph/fork_graph_disk_fs.go | 153 +++++++++++ cl/phase1/forkchoice/fork_graph/interface.go | 4 + cl/phase1/forkchoice/forkchoice.go | 20 +- cl/phase1/forkchoice/forkchoice_slot.go | 1 + cl/phase1/forkchoice/interface.go | 8 +- cl/phase1/stages/clstages.go | 2 +- cmd/caplin/caplin1/run.go | 11 +- turbo/debug/flags.go | 4 +- 24 files changed, 1060 insertions(+), 397 deletions(-) create mode 100644 cl/beacon/beaconhttp/api.go create mode 100644 cl/beacon/rw.go create mode 100644 cl/beacon/validatorapi/endpoints.go create mode 100644 cl/beacon/validatorapi/handler.go create mode 100644 cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go create mode 100644 cl/phase1/forkchoice/forkchoice_slot.go diff --git a/cl/beacon/beaconhttp/api.go b/cl/beacon/beaconhttp/api.go new file mode 100644 index 00000000000..b0c3d94c385 --- /dev/null +++ b/cl/beacon/beaconhttp/api.go @@ -0,0 +1,105 @@ +package beaconhttp + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "reflect" + + "github.com/ledgerwatch/erigon-lib/types/ssz" + "github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph" + "github.com/ledgerwatch/log/v3" +) + +var _ error = EndpointError{} +var _ error = (*EndpointError)(nil) + +type EndpointError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func WrapEndpointError(err error) *EndpointError { + e := &EndpointError{} + if errors.As(err, e) { + return e + } + if errors.Is(err, fork_graph.ErrStateNotFound) { + return NewEndpointError(http.StatusNotFound, "Could not find beacon state") + } + return NewEndpointError(http.StatusInternalServerError, err.Error()) +} + +func NewEndpointError(code int, message string) *EndpointError { + return &EndpointError{ + Code: code, + Message: message, + } +} + +func (e EndpointError) Error() string { + return fmt.Sprintf("Code %d: %s", e.Code, e.Message) +} + +func (e *EndpointError) WriteTo(w http.ResponseWriter) { + w.WriteHeader(e.Code) + encErr := json.NewEncoder(w).Encode(e) + if encErr != nil { + log.Error("beaconapi failed to write json error", "err", encErr) + } +} + +type EndpointHandler[T any] interface { + Handle(r *http.Request) (T, error) +} + +type EndpointHandlerFunc[T any] func(r *http.Request) (T, error) + +func (e EndpointHandlerFunc[T]) Handle(r *http.Request) (T, error) { + return e(r) +} + +func HandleEndpointFunc[T any](h EndpointHandlerFunc[T]) http.HandlerFunc { + return HandleEndpoint[T](h) +} + +func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ans, err := h.Handle(r) + if err != nil { + log.Error("beacon api request error", "err", err) + endpointError := WrapEndpointError(err) + endpointError.WriteTo(w) + return + } + // TODO: ssz handler + // TODO: potentially add a context option to buffer these + contentType := r.Header.Get("Accept") + switch contentType { + case "application/octet-stream": + sszMarshaler, ok := any(ans).(ssz.Marshaler) + if !ok { + NewEndpointError(http.StatusBadRequest, "This endpoint does not support SSZ response").WriteTo(w) + return + } + // TODO: we should probably figure out some way to stream this in the future :) + encoded, err := sszMarshaler.EncodeSSZ(nil) + if err != nil { + WrapEndpointError(err).WriteTo(w) + return + } + w.Write(encoded) + case "application/json", "": + w.Header().Add("content-type", "application/json") + err := json.NewEncoder(w).Encode(ans) + if err != nil { + // this error is fatal, log to console + log.Error("beaconapi failed to encode json", "type", reflect.TypeOf(ans), "err", err) + } + default: + http.Error(w, "content type must be application/json or application/octet-stream", http.StatusBadRequest) + + } + }) +} diff --git a/cl/beacon/handler/blocks.go b/cl/beacon/handler/blocks.go index 8cebe048247..cabe88addca 100644 --- a/cl/beacon/handler/blocks.go +++ b/cl/beacon/handler/blocks.go @@ -7,6 +7,7 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" ) @@ -22,12 +23,12 @@ type getHeadersRequest struct { ParentRoot *libcommon.Hash `json:"root,omitempty"` } -func (a *ApiHandler) rootFromBlockId(ctx context.Context, tx kv.Tx, blockId *segmentID) (root libcommon.Hash, httpStatusErr int, err error) { +func (a *ApiHandler) rootFromBlockId(ctx context.Context, tx kv.Tx, blockId *segmentID) (root libcommon.Hash, err error) { switch { case blockId.head(): root, _, err = a.forkchoiceStore.GetHead() if err != nil { - return libcommon.Hash{}, http.StatusInternalServerError, err + return libcommon.Hash{}, err } case blockId.finalized(): root = a.forkchoiceStore.FinalizedCheckpoint().BlockRoot() @@ -36,134 +37,122 @@ func (a *ApiHandler) rootFromBlockId(ctx context.Context, tx kv.Tx, blockId *seg case blockId.genesis(): root, err = beacon_indicies.ReadCanonicalBlockRoot(tx, 0) if err != nil { - return libcommon.Hash{}, http.StatusInternalServerError, err + return libcommon.Hash{}, err } if root == (libcommon.Hash{}) { - return libcommon.Hash{}, http.StatusNotFound, fmt.Errorf("genesis block not found") + return libcommon.Hash{}, beaconhttp.NewEndpointError(http.StatusNotFound, "genesis block not found") } case blockId.getSlot() != nil: root, err = beacon_indicies.ReadCanonicalBlockRoot(tx, *blockId.getSlot()) if err != nil { - return libcommon.Hash{}, http.StatusInternalServerError, err + return libcommon.Hash{}, err } if root == (libcommon.Hash{}) { - return libcommon.Hash{}, http.StatusNotFound, fmt.Errorf("block not found %d", *blockId.getSlot()) + return libcommon.Hash{}, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("block not found %d", *blockId.getSlot())) } case blockId.getRoot() != nil: // first check if it exists root = *blockId.getRoot() default: - return libcommon.Hash{}, http.StatusInternalServerError, fmt.Errorf("cannot parse block id") + return libcommon.Hash{}, beaconhttp.NewEndpointError(http.StatusInternalServerError, "cannot parse block id") } return } -func (a *ApiHandler) getBlock(r *http.Request) *beaconResponse { - +func (a *ApiHandler) getBlock(r *http.Request) (*beaconResponse, error) { ctx := r.Context() - tx, err := a.indiciesDB.BeginRo(ctx) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } defer tx.Rollback() blockId, err := blockIdFromRequest(r) if err != nil { - return newCriticalErrorResponse(err) - + return nil, err } - root, httpStatus, err := a.rootFromBlockId(ctx, tx, blockId) + root, err := a.rootFromBlockId(ctx, tx, blockId) if err != nil { - return newApiErrorResponse(httpStatus, err.Error()) + return nil, err } blk, err := a.blockReader.ReadBlockByRoot(ctx, tx, root) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } if blk == nil { - return newApiErrorResponse(http.StatusNotFound, fmt.Sprintf("block not found %x", root)) + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("block not found %x", root)) } // Check if the block is canonical var canonicalRoot libcommon.Hash canonicalRoot, err = beacon_indicies.ReadCanonicalBlockRoot(tx, blk.Block.Slot) if err != nil { - return newCriticalErrorResponse(err) + return nil, beaconhttp.WrapEndpointError(err) } return newBeaconResponse(blk). withFinalized(root == canonicalRoot && blk.Block.Slot <= a.forkchoiceStore.FinalizedSlot()). - withVersion(blk.Version()) + withVersion(blk.Version()), nil } -func (a *ApiHandler) getBlockAttestations(r *http.Request) *beaconResponse { +func (a *ApiHandler) getBlockAttestations(r *http.Request) (*beaconResponse, error) { ctx := r.Context() - tx, err := a.indiciesDB.BeginRo(ctx) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } defer tx.Rollback() - blockId, err := blockIdFromRequest(r) if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } - - root, httpStatus, err := a.rootFromBlockId(ctx, tx, blockId) + root, err := a.rootFromBlockId(ctx, tx, blockId) if err != nil { - return newApiErrorResponse(httpStatus, err.Error()) + return nil, err } - blk, err := a.blockReader.ReadBlockByRoot(ctx, tx, root) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } if blk == nil { - return newApiErrorResponse(http.StatusNotFound, fmt.Sprintf("block not found %x", root)) + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("block not found %x", root)) } // Check if the block is canonical canonicalRoot, err := beacon_indicies.ReadCanonicalBlockRoot(tx, blk.Block.Slot) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } - return newBeaconResponse(blk.Block.Body.Attestations).withFinalized(root == canonicalRoot && blk.Block.Slot <= a.forkchoiceStore.FinalizedSlot()). - withVersion(blk.Version()) + withVersion(blk.Version()), nil } -func (a *ApiHandler) getBlockRoot(r *http.Request) *beaconResponse { +func (a *ApiHandler) getBlockRoot(r *http.Request) (*beaconResponse, error) { ctx := r.Context() - tx, err := a.indiciesDB.BeginRo(ctx) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } defer tx.Rollback() - blockId, err := blockIdFromRequest(r) if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } - root, httpStatus, err := a.rootFromBlockId(ctx, tx, blockId) + root, err := a.rootFromBlockId(ctx, tx, blockId) if err != nil { - return newApiErrorResponse(httpStatus, err.Error()) + return nil, err } - // check if the root exist slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, root) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } if slot == nil { - return newApiErrorResponse(http.StatusNotFound, fmt.Sprintf("block not found %x", root)) + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("block not found %x", root)) } // Check if the block is canonical var canonicalRoot libcommon.Hash canonicalRoot, err = beacon_indicies.ReadCanonicalBlockRoot(tx, *slot) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } - - return newBeaconResponse(struct{ Root libcommon.Hash }{Root: root}).withFinalized(canonicalRoot == root && *slot <= a.forkchoiceStore.FinalizedSlot()) + return newBeaconResponse(struct{ Root libcommon.Hash }{Root: root}).withFinalized(canonicalRoot == root && *slot <= a.forkchoiceStore.FinalizedSlot()), nil } diff --git a/cl/beacon/handler/config.go b/cl/beacon/handler/config.go index d6ca4093238..b0e8972c2d8 100644 --- a/cl/beacon/handler/config.go +++ b/cl/beacon/handler/config.go @@ -9,20 +9,20 @@ import ( "github.com/ledgerwatch/erigon/cl/cltypes" ) -func (a *ApiHandler) getSpec(r *http.Request) *beaconResponse { - return newBeaconResponse(a.beaconChainCfg) +func (a *ApiHandler) getSpec(r *http.Request) (*beaconResponse, error) { + return newBeaconResponse(a.beaconChainCfg), nil } -func (a *ApiHandler) getDepositContract(r *http.Request) *beaconResponse { +func (a *ApiHandler) getDepositContract(r *http.Request) (*beaconResponse, error) { return newBeaconResponse(struct { ChainId uint64 `json:"chain_id"` DepositContract string `json:"address"` - }{ChainId: a.beaconChainCfg.DepositChainID, DepositContract: a.beaconChainCfg.DepositContractAddress}) + }{ChainId: a.beaconChainCfg.DepositChainID, DepositContract: a.beaconChainCfg.DepositContractAddress}), nil } -func (a *ApiHandler) getForkSchedule(r *http.Request) *beaconResponse { +func (a *ApiHandler) getForkSchedule(r *http.Request) (*beaconResponse, error) { response := []cltypes.Fork{} // create first response (unordered and incomplete) for currentVersion, epoch := range a.beaconChainCfg.ForkVersionSchedule { @@ -43,5 +43,5 @@ func (a *ApiHandler) getForkSchedule(r *http.Request) *beaconResponse { response[i].PreviousVersion = previousVersion previousVersion = response[i].CurrentVersion } - return newBeaconResponse(response) + return newBeaconResponse(response), nil } diff --git a/cl/beacon/handler/duties_proposer.go b/cl/beacon/handler/duties_proposer.go index 8425b3093d8..609a8292c41 100644 --- a/cl/beacon/handler/duties_proposer.go +++ b/cl/beacon/handler/duties_proposer.go @@ -6,6 +6,7 @@ import ( "net/http" "sync" + "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" shuffling2 "github.com/ledgerwatch/erigon/cl/phase1/core/state/shuffling" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -17,22 +18,22 @@ type proposerDuties struct { Slot uint64 `json:"slot"` } -func (a *ApiHandler) getDutiesProposer(r *http.Request) *beaconResponse { +func (a *ApiHandler) getDutiesProposer(r *http.Request) (*beaconResponse, error) { epoch, err := epochFromRequest(r) if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } if epoch < a.forkchoiceStore.FinalizedCheckpoint().Epoch() { - return newApiErrorResponse(http.StatusBadRequest, "invalid epoch") + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, "invalid epoch") } // We need to compute our duties state, cancel := a.syncedData.HeadState() defer cancel() if state == nil { - return newApiErrorResponse(http.StatusInternalServerError, "beacon node is syncing") + return nil, beaconhttp.NewEndpointError(http.StatusInternalServerError, "beacon node is syncing") } @@ -88,6 +89,6 @@ func (a *ApiHandler) getDutiesProposer(r *http.Request) *beaconResponse { } wg.Wait() - return newBeaconResponse(duties).withFinalized(false).withVersion(a.beaconChainCfg.GetCurrentStateVersion(epoch)) + return newBeaconResponse(duties).withFinalized(false).withVersion(a.beaconChainCfg.GetCurrentStateVersion(epoch)), nil } diff --git a/cl/beacon/handler/format.go b/cl/beacon/handler/format.go index aec7d6a56d5..f2ea28495cb 100644 --- a/cl/beacon/handler/format.go +++ b/cl/beacon/handler/format.go @@ -1,19 +1,16 @@ package handler import ( - "encoding/json" "fmt" "net/http" "regexp" "strconv" - "strings" - "time" "github.com/go-chi/chi/v5" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/types/ssz" + "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" "github.com/ledgerwatch/erigon/cl/clparams" - "github.com/ledgerwatch/log/v3" ) type apiError struct { @@ -26,8 +23,26 @@ type beaconResponse struct { Finalized *bool `json:"finalized,omitempty"` Version *clparams.StateVersion `json:"version,omitempty"` ExecutionOptimistic *bool `json:"execution_optimistic,omitempty"` - apiError *apiError - internalError error +} + +func (b *beaconResponse) EncodeSSZ(xs []byte) ([]byte, error) { + marshaler, ok := b.Data.(ssz.Marshaler) + if !ok { + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, "This endpoint does not support SSZ response") + } + encoded, err := marshaler.EncodeSSZ(nil) + if err != nil { + return nil, err + } + return encoded, nil +} + +func (b *beaconResponse) EncodingSizeSSZ() int { + marshaler, ok := b.Data.(ssz.Marshaler) + if !ok { + return 9 + } + return marshaler.EncodingSizeSSZ() } func newBeaconResponse(data any) *beaconResponse { @@ -53,65 +68,50 @@ func (r *beaconResponse) withVersion(version clparams.StateVersion) (out *beacon return r } -func newCriticalErrorResponse(err error) *beaconResponse { - return &beaconResponse{ - internalError: err, - } -} - -func newApiErrorResponse(code int, msg string) *beaconResponse { - return &beaconResponse{ - apiError: &apiError{ - code: code, - err: fmt.Errorf(msg), - }, - } -} - -// In case of it being a json we need to also expose finalization, version, etc... -type beaconHandlerFn func(r *http.Request) *beaconResponse - -func beaconHandlerWrapper(fn beaconHandlerFn, supportSSZ bool) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - accept := r.Header.Get("Accept") - isSSZ := !strings.Contains(accept, "application/json") && strings.Contains(accept, "application/stream-octect") - start := time.Now() - defer func() { - log.Debug("[Beacon API] finished", "method", r.Method, "path", r.URL.Path, "duration", time.Since(start)) - }() - - resp := fn(r) - if resp.internalError != nil { - http.Error(w, resp.internalError.Error(), http.StatusInternalServerError) - log.Debug("[Beacon API] failed", "method", r.Method, "err", resp.internalError.Error(), "ssz", isSSZ) - return - } - - if resp.apiError != nil { - http.Error(w, resp.apiError.err.Error(), resp.apiError.code) - log.Debug("[Beacon API] failed", "method", r.Method, "err", resp.apiError.err.Error(), "ssz", isSSZ) - return - } - - if isSSZ && supportSSZ { - data := resp.Data - // SSZ encoding - encoded, err := data.(ssz.Marshaler).EncodeSSZ(nil) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - log.Debug("[Beacon API] failed", "method", r.Method, "err", err, "accepted", accept) - return - } - w.Header().Set("Content-Type", "application/octet-stream") - w.Write(encoded) - return - } - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(resp); err != nil { - log.Warn("[Beacon API] failed", "method", r.Method, "err", err, "ssz", isSSZ) - } - } -} +//// In case of it being a json we need to also expose finalization, version, etc... +//type beaconHandlerFn func(r *http.Request) *beaconResponse +// +//func beaconHandlerWrapper(fn beaconHandlerFn, supportSSZ bool) func(w http.ResponseWriter, r *http.Request) { +// return func(w http.ResponseWriter, r *http.Request) { +// accept := r.Header.Get("Accept") +// isSSZ := !strings.Contains(accept, "application/json") && strings.Contains(accept, "application/stream-octect") +// start := time.Now() +// defer func() { +// log.Debug("[Beacon API] finished", "method", r.Method, "path", r.URL.Path, "duration", time.Since(start)) +// }() +// +// resp := fn(r) +// if resp.internalError != nil { +// http.Error(w, resp.internalError.Error(), http.StatusInternalServerError) +// log.Debug("[Beacon API] failed", "method", r.Method, "err", resp.internalError.Error(), "ssz", isSSZ) +// return +// } +// +// if resp.apiError != nil { +// http.Error(w, resp.apiError.err.Error(), resp.apiError.code) +// log.Debug("[Beacon API] failed", "method", r.Method, "err", resp.apiError.err.Error(), "ssz", isSSZ) +// return +// } +// +// if isSSZ && supportSSZ { +// data := resp.Data +// // SSZ encoding +// encoded, err := data.(ssz.Marshaler).EncodeSSZ(nil) +// if err != nil { +// http.Error(w, err.Error(), http.StatusInternalServerError) +// log.Debug("[Beacon API] failed", "method", r.Method, "err", err, "accepted", accept) +// return +// } +// w.Header().Set("Content-Type", "application/octet-stream") +// w.Write(encoded) +// return +// } +// w.Header().Set("Content-Type", "application/json") +// if err := json.NewEncoder(w).Encode(resp); err != nil { +// log.Warn("[Beacon API] failed", "method", r.Method, "err", err, "ssz", isSSZ) +// } +// } +//} type chainTag int diff --git a/cl/beacon/handler/genesis.go b/cl/beacon/handler/genesis.go index 2286d3ae020..5cbb8668a93 100644 --- a/cl/beacon/handler/genesis.go +++ b/cl/beacon/handler/genesis.go @@ -5,6 +5,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common" libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" "github.com/ledgerwatch/erigon/cl/fork" ) @@ -14,19 +15,19 @@ type genesisReponse struct { GenesisForkVersion libcommon.Bytes4 `json:"genesis_fork_version,omitempty"` } -func (a *ApiHandler) getGenesis(r *http.Request) *beaconResponse { +func (a *ApiHandler) getGenesis(r *http.Request) (*beaconResponse, error) { if a.genesisCfg == nil { - return newApiErrorResponse(http.StatusNotFound, "Genesis Config is missing") + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, "Genesis Config is missing") } digest, err := fork.ComputeForkDigest(a.beaconChainCfg, a.genesisCfg) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } return newBeaconResponse(&genesisReponse{ GenesisTime: a.genesisCfg.GenesisTime, GenesisValidatorRoot: a.genesisCfg.GenesisValidatorRoot, GenesisForkVersion: digest, - }) + }), nil } diff --git a/cl/beacon/handler/handler.go b/cl/beacon/handler/handler.go index 407fee6965e..b6703bb7b88 100644 --- a/cl/beacon/handler/handler.go +++ b/cl/beacon/handler/handler.go @@ -6,6 +6,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" "github.com/ledgerwatch/erigon/cl/beacon/synced_data" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/persistence" @@ -38,74 +39,74 @@ func (a *ApiHandler) init() { // otterscn specific ones are commented as such r.Route("/eth", func(r chi.Router) { r.Route("/v1", func(r chi.Router) { - r.Get("/events", nil) + r.Get("/events", http.NotFound) r.Route("/config", func(r chi.Router) { - r.Get("/spec", beaconHandlerWrapper(a.getSpec, false)) - r.Get("/deposit_contract", beaconHandlerWrapper(a.getDepositContract, false)) - r.Get("/fork_schedule", beaconHandlerWrapper(a.getForkSchedule, false)) + r.Get("/spec", beaconhttp.HandleEndpointFunc(a.getSpec)) + r.Get("/deposit_contract", beaconhttp.HandleEndpointFunc(a.getDepositContract)) + r.Get("/fork_schedule", beaconhttp.HandleEndpointFunc(a.getForkSchedule)) }) r.Route("/beacon", func(r chi.Router) { r.Route("/headers", func(r chi.Router) { - r.Get("/", beaconHandlerWrapper(a.getHeaders, false)) - r.Get("/{block_id}", beaconHandlerWrapper(a.getHeader, false)) + r.Get("/", beaconhttp.HandleEndpointFunc(a.getHeaders)) + r.Get("/{block_id}", beaconhttp.HandleEndpointFunc(a.getHeader)) }) r.Route("/blocks", func(r chi.Router) { - r.Post("/", nil) - r.Get("/{block_id}", beaconHandlerWrapper(a.getBlock, true)) - r.Get("/{block_id}/attestations", beaconHandlerWrapper(a.getBlockAttestations, true)) - r.Get("/{block_id}/root", beaconHandlerWrapper(a.getBlockRoot, false)) + r.Post("/", http.NotFound) + r.Get("/{block_id}", beaconhttp.HandleEndpointFunc(a.getBlock)) + r.Get("/{block_id}/attestations", beaconhttp.HandleEndpointFunc(a.getBlockAttestations)) + r.Get("/{block_id}/root", beaconhttp.HandleEndpointFunc(a.getBlockRoot)) }) - r.Get("/genesis", beaconHandlerWrapper(a.getGenesis, false)) - r.Post("/binded_blocks", nil) + r.Get("/genesis", beaconhttp.HandleEndpointFunc(a.getGenesis)) + r.Post("/binded_blocks", http.NotFound) r.Route("/pool", func(r chi.Router) { - r.Post("/attestations", nil) - r.Get("/voluntary_exits", beaconHandlerWrapper(a.poolVoluntaryExits, false)) - r.Get("/attester_slashings", beaconHandlerWrapper(a.poolAttesterSlashings, false)) - r.Get("/proposer_slashings", beaconHandlerWrapper(a.poolProposerSlashings, false)) - r.Get("/bls_to_execution_changes", beaconHandlerWrapper(a.poolBlsToExecutionChanges, false)) - r.Get("/attestations", beaconHandlerWrapper(a.poolAttestations, false)) - r.Post("/sync_committees", nil) + r.Post("/attestations", http.NotFound) + r.Get("/voluntary_exits", beaconhttp.HandleEndpointFunc(a.poolVoluntaryExits)) + r.Get("/attester_slashings", beaconhttp.HandleEndpointFunc(a.poolAttesterSlashings)) + r.Get("/proposer_slashings", beaconhttp.HandleEndpointFunc(a.poolProposerSlashings)) + r.Get("/bls_to_execution_changes", beaconhttp.HandleEndpointFunc(a.poolBlsToExecutionChanges)) + r.Get("/attestations", beaconhttp.HandleEndpointFunc(a.poolAttestations)) + r.Post("/sync_committees", http.NotFound) }) - r.Get("/node/syncing", nil) + r.Get("/node/syncing", http.NotFound) r.Route("/states", func(r chi.Router) { - r.Get("/head/validators/{index}", nil) // otterscan - r.Get("/head/committees", nil) // otterscan + r.Get("/head/validators/{index}", http.NotFound) // otterscan + r.Get("/head/committees", http.NotFound) // otterscan r.Route("/{state_id}", func(r chi.Router) { - r.Get("/validators", nil) - r.Get("/root", beaconHandlerWrapper(a.getStateRoot, false)) - r.Get("/fork", beaconHandlerWrapper(a.getStateFork, false)) - r.Get("/validators/{id}", nil) + r.Get("/validators", http.NotFound) + r.Get("/root", beaconhttp.HandleEndpointFunc(a.getStateRoot)) + r.Get("/fork", beaconhttp.HandleEndpointFunc(a.getStateFork)) + r.Get("/validators/{id}", http.NotFound) }) }) }) r.Route("/validator", func(r chi.Router) { r.Route("/duties", func(r chi.Router) { - r.Post("/attester/{epoch}", nil) - r.Get("/proposer/{epoch}", beaconHandlerWrapper(a.getDutiesProposer, false)) - r.Post("/sync/{epoch}", nil) + r.Post("/attester/{epoch}", http.NotFound) + r.Get("/proposer/{epoch}", beaconhttp.HandleEndpointFunc(a.getDutiesProposer)) + r.Post("/sync/{epoch}", http.NotFound) }) - r.Get("/blinded_blocks/{slot}", nil) - r.Get("/attestation_data", nil) - r.Get("/aggregate_attestation", nil) - r.Post("/aggregate_and_proofs", nil) - r.Post("/beacon_committee_subscriptions", nil) - r.Post("/sync_committee_subscriptions", nil) - r.Get("/sync_committee_contribution", nil) - r.Post("/contribution_and_proofs", nil) - r.Post("/prepare_beacon_proposer", nil) + r.Get("/blinded_blocks/{slot}", http.NotFound) + r.Get("/attestation_data", http.NotFound) + r.Get("/aggregate_attestation", http.NotFound) + r.Post("/aggregate_and_proofs", http.NotFound) + r.Post("/beacon_committee_subscriptions", http.NotFound) + r.Post("/sync_committee_subscriptions", http.NotFound) + r.Get("/sync_committee_contribution", http.NotFound) + r.Post("/contribution_and_proofs", http.NotFound) + r.Post("/prepare_beacon_proposer", http.NotFound) }) }) r.Route("/v2", func(r chi.Router) { r.Route("/debug", func(r chi.Router) { r.Route("/beacon", func(r chi.Router) { - r.Get("/states/{state_id}", beaconHandlerWrapper(a.getFullState, true)) + r.Get("/states/{state_id}", beaconhttp.HandleEndpointFunc(a.getFullState)) }) }) r.Route("/beacon", func(r chi.Router) { - r.Get("/blocks/{block_id}", beaconHandlerWrapper(a.getBlock, true)) //otterscan + r.Get("/blocks/{block_id}", beaconhttp.HandleEndpointFunc(a.getBlock)) //otterscan }) r.Route("/validator", func(r chi.Router) { - r.Post("/blocks/{slot}", nil) + r.Post("/blocks/{slot}", http.NotFound) }) }) }) diff --git a/cl/beacon/handler/headers.go b/cl/beacon/handler/headers.go index 563026aec9c..e6b18607115 100644 --- a/cl/beacon/handler/headers.go +++ b/cl/beacon/handler/headers.go @@ -5,24 +5,25 @@ import ( "net/http" libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" ) -func (a *ApiHandler) getHeaders(r *http.Request) *beaconResponse { +func (a *ApiHandler) getHeaders(r *http.Request) (*beaconResponse, error) { ctx := r.Context() querySlot, err := uint64FromQueryParams(r, "slot") if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } queryParentHash, err := hashFromQueryParams(r, "parent_root") if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } tx, err := a.indiciesDB.BeginRo(ctx) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } defer tx.Rollback() var candidates []libcommon.Hash @@ -34,7 +35,7 @@ func (a *ApiHandler) getHeaders(r *http.Request) *beaconResponse { // get all blocks with this parent slot, err = beacon_indicies.ReadBlockSlotByBlockRoot(tx, *queryParentHash) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } if slot == nil { break @@ -44,13 +45,13 @@ func (a *ApiHandler) getHeaders(r *http.Request) *beaconResponse { } potentialRoot, err = beacon_indicies.ReadCanonicalBlockRoot(tx, *slot+1) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } candidates = append(candidates, potentialRoot) case queryParentHash == nil && querySlot != nil: potentialRoot, err = beacon_indicies.ReadCanonicalBlockRoot(tx, *querySlot) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } candidates = append(candidates, potentialRoot) case queryParentHash == nil && querySlot == nil: @@ -60,7 +61,7 @@ func (a *ApiHandler) getHeaders(r *http.Request) *beaconResponse { } potentialRoot, err = beacon_indicies.ReadCanonicalBlockRoot(tx, headSlot) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } candidates = append(candidates, potentialRoot) } @@ -69,7 +70,7 @@ func (a *ApiHandler) getHeaders(r *http.Request) *beaconResponse { for _, root := range candidates { signedHeader, err := a.blockReader.ReadHeaderByRoot(ctx, tx, root) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } if signedHeader == nil || (queryParentHash != nil && signedHeader.Header.ParentRoot != *queryParentHash) || (querySlot != nil && signedHeader.Header.Slot != *querySlot) { continue @@ -77,7 +78,7 @@ func (a *ApiHandler) getHeaders(r *http.Request) *beaconResponse { canonicalRoot, err := beacon_indicies.ReadCanonicalBlockRoot(tx, signedHeader.Header.Slot) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } headers = append(headers, &headerResponse{ Root: root, @@ -85,37 +86,37 @@ func (a *ApiHandler) getHeaders(r *http.Request) *beaconResponse { Header: signedHeader, }) } - return newBeaconResponse(headers) + return newBeaconResponse(headers), nil } -func (a *ApiHandler) getHeader(r *http.Request) *beaconResponse { +func (a *ApiHandler) getHeader(r *http.Request) (*beaconResponse, error) { ctx := r.Context() tx, err := a.indiciesDB.BeginRo(ctx) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } defer tx.Rollback() blockId, err := blockIdFromRequest(r) if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } - root, httpStatus, err := a.rootFromBlockId(ctx, tx, blockId) + root, err := a.rootFromBlockId(ctx, tx, blockId) if err != nil { - return newApiErrorResponse(httpStatus, err.Error()) + return nil, err } signedHeader, err := a.blockReader.ReadHeaderByRoot(ctx, tx, root) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } if signedHeader == nil { - return newApiErrorResponse(http.StatusNotFound, fmt.Sprintf("block not found %x", root)) + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("block not found %x", root)) } var canonicalRoot libcommon.Hash canonicalRoot, err = beacon_indicies.ReadCanonicalBlockRoot(tx, signedHeader.Header.Slot) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } version := a.beaconChainCfg.GetCurrentStateVersion(signedHeader.Header.Slot / a.beaconChainCfg.SlotsPerEpoch) @@ -124,5 +125,5 @@ func (a *ApiHandler) getHeader(r *http.Request) *beaconResponse { Root: root, Canonical: canonicalRoot == root, Header: signedHeader, - }).withFinalized(canonicalRoot == root && signedHeader.Header.Slot <= a.forkchoiceStore.FinalizedSlot()).withVersion(version) + }).withFinalized(canonicalRoot == root && signedHeader.Header.Slot <= a.forkchoiceStore.FinalizedSlot()).withVersion(version), nil } diff --git a/cl/beacon/handler/pool.go b/cl/beacon/handler/pool.go index 97661e16a80..66614f904f2 100644 --- a/cl/beacon/handler/pool.go +++ b/cl/beacon/handler/pool.go @@ -4,22 +4,22 @@ import ( "net/http" ) -func (a *ApiHandler) poolVoluntaryExits(r *http.Request) *beaconResponse { - return newBeaconResponse(a.operationsPool.VoluntaryExistsPool.Raw()) +func (a *ApiHandler) poolVoluntaryExits(r *http.Request) (*beaconResponse, error) { + return newBeaconResponse(a.operationsPool.VoluntaryExistsPool.Raw()), nil } -func (a *ApiHandler) poolAttesterSlashings(r *http.Request) *beaconResponse { - return newBeaconResponse(a.operationsPool.AttesterSlashingsPool.Raw()) +func (a *ApiHandler) poolAttesterSlashings(r *http.Request) (*beaconResponse, error) { + return newBeaconResponse(a.operationsPool.AttesterSlashingsPool.Raw()), nil } -func (a *ApiHandler) poolProposerSlashings(r *http.Request) *beaconResponse { - return newBeaconResponse(a.operationsPool.ProposerSlashingsPool.Raw()) +func (a *ApiHandler) poolProposerSlashings(r *http.Request) (*beaconResponse, error) { + return newBeaconResponse(a.operationsPool.ProposerSlashingsPool.Raw()), nil } -func (a *ApiHandler) poolBlsToExecutionChanges(r *http.Request) *beaconResponse { - return newBeaconResponse(a.operationsPool.BLSToExecutionChangesPool.Raw()) +func (a *ApiHandler) poolBlsToExecutionChanges(r *http.Request) (*beaconResponse, error) { + return newBeaconResponse(a.operationsPool.BLSToExecutionChangesPool.Raw()), nil } -func (a *ApiHandler) poolAttestations(r *http.Request) *beaconResponse { - return newBeaconResponse(a.operationsPool.AttestationsPool.Raw()) +func (a *ApiHandler) poolAttestations(r *http.Request) (*beaconResponse, error) { + return newBeaconResponse(a.operationsPool.AttestationsPool.Raw()), nil } diff --git a/cl/beacon/handler/states.go b/cl/beacon/handler/states.go index f773e5da37d..389a6d4140a 100644 --- a/cl/beacon/handler/states.go +++ b/cl/beacon/handler/states.go @@ -7,6 +7,7 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" @@ -68,30 +69,30 @@ func previousVersion(v clparams.StateVersion) clparams.StateVersion { return v - 1 } -func (a *ApiHandler) getStateFork(r *http.Request) *beaconResponse { +func (a *ApiHandler) getStateFork(r *http.Request) (*beaconResponse, error) { ctx := r.Context() tx, err := a.indiciesDB.BeginRo(ctx) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } defer tx.Rollback() blockId, err := stateIdFromRequest(r) if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } root, httpStatus, err := a.rootFromStateId(ctx, tx, blockId) if err != nil { - return newApiErrorResponse(httpStatus, err.Error()) + return nil, beaconhttp.NewEndpointError(httpStatus, err.Error()) } slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, root) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } if slot == nil { - return newApiErrorResponse(http.StatusNotFound, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, err.Error()) } epoch := *slot / a.beaconChainCfg.SlotsPerEpoch stateVersion := a.beaconChainCfg.GetCurrentStateVersion(epoch) @@ -102,78 +103,78 @@ func (a *ApiHandler) getStateFork(r *http.Request) *beaconResponse { PreviousVersion: utils.Uint32ToBytes4(previousVersion), CurrentVersion: utils.Uint32ToBytes4(currentVersion), Epoch: epoch, - }) + }), nil } -func (a *ApiHandler) getStateRoot(r *http.Request) *beaconResponse { +func (a *ApiHandler) getStateRoot(r *http.Request) (*beaconResponse, error) { ctx := r.Context() tx, err := a.indiciesDB.BeginRo(ctx) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } defer tx.Rollback() blockId, err := stateIdFromRequest(r) if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } root, httpStatus, err := a.rootFromStateId(ctx, tx, blockId) if err != nil { - return newApiErrorResponse(httpStatus, err.Error()) + return nil, beaconhttp.NewEndpointError(httpStatus, err.Error()) } stateRoot, err := beacon_indicies.ReadStateRootByBlockRoot(ctx, tx, root) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } if stateRoot == (libcommon.Hash{}) { - return newApiErrorResponse(http.StatusNotFound, fmt.Sprintf("could not read block header: %x", root)) + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("could not read block header: %x", root)) } slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, root) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } if slot == nil { - return newApiErrorResponse(http.StatusNotFound, fmt.Sprintf("could not read block header: %x", root)) + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("could not read block header: %x", root)) } canonicalRoot, err := beacon_indicies.ReadCanonicalBlockRoot(tx, *slot) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } - return newBeaconResponse(&rootResponse{Root: stateRoot}).withFinalized(canonicalRoot == root && *slot <= a.forkchoiceStore.FinalizedSlot()) + return newBeaconResponse(&rootResponse{Root: stateRoot}).withFinalized(canonicalRoot == root && *slot <= a.forkchoiceStore.FinalizedSlot()), nil } -func (a *ApiHandler) getFullState(r *http.Request) *beaconResponse { +func (a *ApiHandler) getFullState(r *http.Request) (*beaconResponse, error) { ctx := r.Context() tx, err := a.indiciesDB.BeginRo(ctx) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } defer tx.Rollback() blockId, err := stateIdFromRequest(r) if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } root, httpStatus, err := a.rootFromStateId(ctx, tx, blockId) if err != nil { - return newApiErrorResponse(httpStatus, err.Error()) + return nil, beaconhttp.NewEndpointError(httpStatus, err.Error()) } blockRoot, err := beacon_indicies.ReadBlockRootByStateRoot(tx, root) if err != nil { - return newCriticalErrorResponse(err) + return nil, err } - state, err := a.forkchoiceStore.GetFullState(blockRoot, true) + state, err := a.forkchoiceStore.GetStateAtBlockRoot(blockRoot, true) if err != nil { - return newApiErrorResponse(http.StatusBadRequest, err.Error()) + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error()) } - return newBeaconResponse(state).withFinalized(false).withVersion(state.Version()) + return newBeaconResponse(state).withFinalized(false).withVersion(state.Version()), nil } diff --git a/cl/beacon/middleware.go b/cl/beacon/middleware.go index b9f66a4ab33..519aebf0527 100644 --- a/cl/beacon/middleware.go +++ b/cl/beacon/middleware.go @@ -1,17 +1 @@ package beacon - -import ( - "net/http" -) - -func newBeaconMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - contentType := r.Header.Get("Content-Type") - if contentType != "application/json" && contentType != "" { - http.Error(w, "Content-Type header must be application/json", http.StatusUnsupportedMediaType) - return - } - - next.ServeHTTP(w, r) - }) -} diff --git a/cl/beacon/router.go b/cl/beacon/router.go index 089c58c12b7..3fb927f0d33 100644 --- a/cl/beacon/router.go +++ b/cl/beacon/router.go @@ -1,20 +1,56 @@ package beacon import ( - "fmt" "net" "net/http" + "strings" + "github.com/go-chi/chi/v5" "github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration" "github.com/ledgerwatch/erigon/cl/beacon/handler" + "github.com/ledgerwatch/erigon/cl/beacon/validatorapi" "github.com/ledgerwatch/log/v3" ) -func ListenAndServe(api *handler.ApiHandler, routerCfg beacon_router_configuration.RouterConfiguration) { +type LayeredBeaconHandler struct { + ValidatorApi *validatorapi.ValidatorApiHandler + ArchiveApi *handler.ApiHandler +} + +func ListenAndServe(beaconHandler *LayeredBeaconHandler, routerCfg beacon_router_configuration.RouterConfiguration) error { listener, err := net.Listen(routerCfg.Protocol, routerCfg.Address) - fmt.Println(routerCfg.Address, routerCfg.Protocol) + if err != nil { + return err + } + defer listener.Close() + mux := chi.NewRouter() + // enforce json content type + mux.Use(func(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + contentType := r.Header.Get("Content-Type") + if len(contentType) > 0 && !strings.EqualFold(contentType, "application/json") { + http.Error(w, "Content-Type header must be application/json", http.StatusUnsupportedMediaType) + return + } + h.ServeHTTP(w, r) + }) + }) + // layered handling - 404 on first handler falls back to the second + mux.HandleFunc("/eth/*", func(w http.ResponseWriter, r *http.Request) { + nfw := ¬FoundNoWriter{rw: w} + beaconHandler.ValidatorApi.ServeHTTP(nfw, r) + if nfw.code == 404 || nfw.code == 0 { + beaconHandler.ArchiveApi.ServeHTTP(w, r) + } + }) + mux.HandleFunc("/validator/*", func(w http.ResponseWriter, r *http.Request) { + http.StripPrefix("/validator", beaconHandler.ValidatorApi).ServeHTTP(w, r) + }) + mux.HandleFunc("/archive/*", func(w http.ResponseWriter, r *http.Request) { + http.StripPrefix("/archive", beaconHandler.ArchiveApi).ServeHTTP(w, r) + }) server := &http.Server{ - Handler: newBeaconMiddleware(api), + Handler: mux, ReadTimeout: routerCfg.ReadTimeTimeout, IdleTimeout: routerCfg.IdleTimeout, WriteTimeout: routerCfg.IdleTimeout, @@ -25,5 +61,18 @@ func ListenAndServe(api *handler.ApiHandler, routerCfg beacon_router_configurati if err := server.Serve(listener); err != nil { log.Warn("[Beacon API] failed to start serving", "addr", routerCfg.Address, "err", err) + return err } + return nil +} + +func newBeaconMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + contentType := r.Header.Get("Content-Type") + if contentType != "application/json" && contentType != "" { + http.Error(w, "Content-Type header must be application/json", http.StatusUnsupportedMediaType) + return + } + next.ServeHTTP(w, r) + }) } diff --git a/cl/beacon/rw.go b/cl/beacon/rw.go new file mode 100644 index 00000000000..33a74b2fb7e --- /dev/null +++ b/cl/beacon/rw.go @@ -0,0 +1,42 @@ +package beacon + +import ( + "net/http" +) + +type notFoundNoWriter struct { + rw http.ResponseWriter + + code int + headers http.Header +} + +func (f *notFoundNoWriter) Header() http.Header { + if f.code == 404 { + return make(http.Header) + } + return f.rw.Header() +} + +func (f *notFoundNoWriter) Write(xs []byte) (int, error) { + // write code 200 if code not written yet + if f.code == 0 { + f.WriteHeader(200) + } + if f.code == 404 { + return 0, nil + } + // pass on the write + return f.rw.Write(xs) +} + +func (f *notFoundNoWriter) WriteHeader(statusCode int) { + if f.code != 0 { + return + } + if f.code != 404 { + f.rw.WriteHeader(statusCode) + } + // if it's a 404 and we are not at our last handler, set the target to an io.Discard + f.code = statusCode +} diff --git a/cl/beacon/validatorapi/endpoints.go b/cl/beacon/validatorapi/endpoints.go new file mode 100644 index 00000000000..ed06d471a2f --- /dev/null +++ b/cl/beacon/validatorapi/endpoints.go @@ -0,0 +1,249 @@ +package validatorapi + +import ( + "fmt" + "net/http" + "strconv" + "strings" + "unicode" + + "github.com/go-chi/chi/v5" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/hexutility" + "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/fork" + "github.com/ledgerwatch/erigon/cl/phase1/core/state" + "github.com/ledgerwatch/erigon/cl/utils" +) + +func (v *ValidatorApiHandler) GetEthV1NodeSyncing(r *http.Request) (any, error) { + _, slot, err := v.FC.GetHead() + if err != nil { + return nil, err + } + + realHead := utils.GetCurrentSlot(v.GenesisCfg.GenesisTime, v.BeaconChainCfg.SecondsPerSlot) + + isSyncing := realHead > slot + + syncDistance := 0 + if isSyncing { + syncDistance = int(realHead) - int(slot) + } + + elOffline := true + if v.FC.Engine() != nil { + val, err := v.FC.Engine().Ready() + if err == nil { + elOffline = !val + } + } + + return map[string]any{ + "head_slot": strconv.FormatUint(slot, 10), + "sync_distance": syncDistance, + "is_syncing": isSyncing, + "el_offline": elOffline, + // TODO: figure out how to populat this field + "is_optimistic": true, + }, nil +} + +func (v *ValidatorApiHandler) EventSourceGetV1Events(w http.ResponseWriter, r *http.Request) { +} + +func (v *ValidatorApiHandler) GetEthV1ConfigSpec(r *http.Request) (*clparams.BeaconChainConfig, error) { + if v.BeaconChainCfg == nil { + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, "beacon config not found") + } + return v.BeaconChainCfg, nil +} + +func (v *ValidatorApiHandler) GetEthV1BeaconGenesis(r *http.Request) (any, error) { + if v.GenesisCfg == nil { + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, "genesis config not found") + } + digest, err := fork.ComputeForkDigest(v.BeaconChainCfg, v.GenesisCfg) + if err != nil { + return nil, beaconhttp.NewEndpointError(http.StatusInternalServerError, err.Error()) + } + return map[string]any{ + "genesis_time": v.GenesisCfg.GenesisTime, + "genesis_validator_root": v.GenesisCfg.GenesisValidatorRoot, + "genesis_fork_version": hexutility.Bytes(digest[:]), + }, nil +} + +func (v *ValidatorApiHandler) GetEthV1BeaconStatesStateIdFork(r *http.Request) (any, error) { + stateId := chi.URLParam(r, "state_id") + state, err := v.privateGetStateFromStateId(stateId) + if err != nil { + return nil, err + } + isFinalized := state.Slot() <= v.FC.FinalizedSlot() + forkData := state.BeaconState.Fork() + return map[string]any{ + // TODO: this "True if the response references an unverified execution payload. " + // figure out the condition where this happens + "execution_optimistic": false, + "finalized": isFinalized, + "data": map[string]any{ + "previous_version": hexutility.Bytes(forkData.PreviousVersion[:]), + "current_version": hexutility.Bytes(forkData.CurrentVersion[:]), + "epoch": strconv.Itoa(int(forkData.Epoch)), + }, + }, nil +} +func (v *ValidatorApiHandler) GetEthV1BeaconStatesStateIdValidatorsValidatorId(r *http.Request) (any, error) { + stateId := chi.URLParam(r, "state_id") + // grab the correct state for the given state id + beaconState, err := v.privateGetStateFromStateId(stateId) + if err != nil { + return nil, err + } + + var validatorIndex uint64 + validatorId := chi.URLParam(r, "validator_id") + switch { + case strings.HasPrefix(validatorId, "0x"): + // assume is hex has, so try to parse + hsh := common.Bytes48{} + err := hsh.UnmarshalText([]byte(stateId)) + if err != nil { + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Sprintf("Invalid validator ID: %s", validatorId)) + } + val, ok := beaconState.ValidatorIndexByPubkey(hsh) + if !ok { + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("validator not found: %s", validatorId)) + } + validatorIndex = val + case isInt(validatorId): + val, err := strconv.ParseUint(validatorId, 10, 64) + if err != nil { + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Sprintf("Invalid validator ID: %s", validatorId)) + } + validatorIndex = val + default: + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Sprintf("Invalid validator ID: %s", validatorId)) + } + // at this point validatorIndex is neccesarily assigned, so we can trust the zero value + validator, err := beaconState.ValidatorForValidatorIndex(int(validatorIndex)) + if err != nil { + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("validator not found at %s: %s ", stateId, validatorId)) + } + validatorBalance, err := beaconState.ValidatorBalance(int(validatorIndex)) + if err != nil { + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("balance not found at %s: %s ", stateId, validatorId)) + } + + //pending_initialized - When the first deposit is processed, but not enough funds are available (or not yet the end of the first epoch) to get validator into the activation queue. + //pending_queued - When validator is waiting to get activated, and have enough funds etc. while in the queue, validator activation epoch keeps changing until it gets to the front and make it through (finalization is a requirement here too). + //active_ongoing - When validator must be attesting, and have not initiated any exit. + //active_exiting - When validator is still active, but filed a voluntary request to exit. + //active_slashed - When validator is still active, but have a slashed status and is scheduled to exit. + //exited_unslashed - When validator has reached regular exit epoch, not being slashed, and doesn't have to attest any more, but cannot withdraw yet. + //exited_slashed - When validator has reached regular exit epoch, but was slashed, have to wait for a longer withdrawal period. + //withdrawal_possible - After validator has exited, a while later is permitted to move funds, and is truly out of the system. + //withdrawal_done - (not possible in phase0, except slashing full balance) - actually having moved funds away + + epoch := state.GetEpochAtSlot(v.BeaconChainCfg, beaconState.Slot()) + // TODO: figure out what is wrong and missing here + validator_status := func() string { + // see if validator has exited + if validator.ExitEpoch() >= epoch { + if validator.WithdrawableEpoch() >= epoch { + // TODO: is this right? not sure if correct way to check for withdrawal_done + if validatorBalance == 0 { + return "withdrawal_done" + } + return "withdrawal_possible" + } + if validator.Slashed() { + return "exited_slashed" + } + return "exited_unslashed" + } + // at this point we know they have not exited, so they are either active or pending + if validator.Active(epoch) { + // if active, figure out if they are slashed + if validator.Slashed() { + return "active_slashed" + } + if validator.ExitEpoch() != v.BeaconChainCfg.FarFutureEpoch { + return "active_exiting" + } + return "active_ongoing" + } + // check if enough funds (TODO: or end of first epoch??) + if validatorBalance >= v.BeaconChainCfg.MinDepositAmount { + return "pending_initialized" + } + return "pending_queued" + }() + + isFinalized := beaconState.Slot() <= v.FC.FinalizedSlot() + return map[string]any{ + // TODO: this "True if the response references an unverified execution payload. " + // figure out the condition where this happens + "execution_optimistic": false, + "finalized": isFinalized, + "data": map[string]any{ + "index": strconv.FormatUint(validatorIndex, 10), + "balance": strconv.FormatUint(validatorBalance, 10), + "status": validator_status, + "data": map[string]any{ + "pubkey": hexutility.Bytes(validator.PublicKeyBytes()), + "withdraw_credentials": hexutility.Bytes(validator.WithdrawalCredentials().Bytes()), + "effective_balance": strconv.FormatUint(validator.EffectiveBalance(), 10), + "slashed": validator.Slashed(), + "activation_eligibility_epoch": strconv.FormatUint(validator.ActivationEligibilityEpoch(), 10), + "activation_epoch": strconv.FormatUint(validator.ActivationEpoch(), 10), + "exit_epoch": strconv.FormatUint(validator.ActivationEpoch(), 10), + "withdrawable_epoch": strconv.FormatUint(validator.WithdrawableEpoch(), 10), + }, + }, + }, nil +} + +func (v *ValidatorApiHandler) privateGetStateFromStateId(stateId string) (*state.CachingBeaconState, error) { + switch { + case stateId == "head": + // Now check the head + headRoot, _, err := v.FC.GetHead() + if err != nil { + return nil, err + } + return v.FC.GetStateAtBlockRoot(headRoot, true) + case stateId == "genesis": + // not supported + return nil, beaconhttp.NewEndpointError(http.StatusNotFound, "genesis block not found") + case stateId == "finalized": + return v.FC.GetStateAtBlockRoot(v.FC.FinalizedCheckpoint().BlockRoot(), true) + case stateId == "justified": + return v.FC.GetStateAtBlockRoot(v.FC.JustifiedCheckpoint().BlockRoot(), true) + case strings.HasPrefix(stateId, "0x"): + // assume is hex has, so try to parse + hsh := common.Hash{} + err := hsh.UnmarshalText([]byte(stateId)) + if err != nil { + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Sprintf("Invalid state ID: %s", stateId)) + } + return v.FC.GetStateAtStateRoot(hsh, true) + case isInt(stateId): + // ignore the error bc isInt check succeeded. yes this doesn't protect for overflow, they will request slot 0 and it will fail. good + val, _ := strconv.ParseUint(stateId, 10, 64) + return v.FC.GetStateAtSlot(val, true) + default: + return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Sprintf("Invalid state ID: %s", stateId)) + } +} + +func isInt(s string) bool { + for _, c := range s { + if !unicode.IsDigit(c) { + return false + } + } + return true +} diff --git a/cl/beacon/validatorapi/handler.go b/cl/beacon/validatorapi/handler.go new file mode 100644 index 00000000000..838ef398240 --- /dev/null +++ b/cl/beacon/validatorapi/handler.go @@ -0,0 +1,89 @@ +package validatorapi + +import ( + "net/http" + "sync" + + "github.com/go-chi/chi/v5" + "github.com/ledgerwatch/erigon/cl/beacon/beaconhttp" + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" +) + +type ValidatorApiHandler struct { + FC forkchoice.ForkChoiceStorage + + BeaconChainCfg *clparams.BeaconChainConfig + GenesisCfg *clparams.GenesisConfig + + o sync.Once + mux chi.Router +} + +func (v *ValidatorApiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + v.o.Do(func() { + v.mux = chi.NewRouter() + v.init(v.mux) + }) + v.mux.ServeHTTP(w, r) +} + +func (v *ValidatorApiHandler) init(r chi.Router) { + r.Route("/eth", func(r chi.Router) { + r.Route("/v1", func(r chi.Router) { + r.Route("/beacon", func(r chi.Router) { + r.Get("/genesis", beaconhttp.HandleEndpointFunc(v.GetEthV1BeaconGenesis)) + r.Route("/states", func(r chi.Router) { + r.Route("/{state_id}", func(r chi.Router) { + r.Get("/fork", beaconhttp.HandleEndpointFunc(v.GetEthV1BeaconStatesStateIdFork)) + r.Get("/validators/{validator_id}", beaconhttp.HandleEndpointFunc(v.GetEthV1BeaconStatesStateIdValidatorsValidatorId)) + }) + }) + r.Post("/binded_blocks", http.NotFound) + r.Post("/blocks", http.NotFound) + r.Route("/pool", func(r chi.Router) { + r.Post("/attestations", http.NotFound) + r.Post("/sync_committees", http.NotFound) + }) + r.Get("/node/syncing", beaconhttp.HandleEndpointFunc(v.GetEthV1NodeSyncing)) + }) + r.Get("/config/spec", beaconhttp.HandleEndpointFunc(v.GetEthV1ConfigSpec)) + r.Get("/events", http.NotFound) + r.Route("/validator", func(r chi.Router) { + r.Route("/duties", func(r chi.Router) { + r.Post("/attester/{epoch}", http.NotFound) + r.Get("/proposer/{epoch}", http.NotFound) + r.Post("/sync/{epoch}", http.NotFound) + }) + // r.Get("/blinded_blocks/{slot}", http.NotFound) - deprecated + r.Get("/attestation_data", http.NotFound) + r.Get("/aggregate_attestation", http.NotFound) + r.Post("/aggregate_and_proofs", http.NotFound) + r.Post("/beacon_committee_subscriptions", http.NotFound) + r.Post("/sync_committee_subscriptions", http.NotFound) + r.Get("/sync_committee_contribution", http.NotFound) + r.Post("/contribution_and_proofs", http.NotFound) + r.Post("/prepare_beacon_proposer", http.NotFound) + }) + }) + r.Route("/v2", func(r chi.Router) { + r.Route("/debug", func(r chi.Router) { + r.Route("/beacon", func(r chi.Router) { + r.Get("/states/{state_id}", http.NotFound) + }) + }) + r.Route("/beacon", func(r chi.Router) { + r.Post("/blocks/{block_id}", http.NotFound) + }) + r.Route("/validator", func(r chi.Router) { + r.Post("/blocks/{slot}", http.NotFound) + }) + }) + r.Route("/v3", func(r chi.Router) { + r.Route("/beacon", func(r chi.Router) { + r.Get("/blocks/{block_id}", http.NotFound) + }) + }) + }) + +} diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go index 3bd7dbe1df5..d22d99905f3 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go @@ -2,13 +2,9 @@ package fork_graph import ( "bytes" - "encoding/binary" "errors" - "fmt" - "os" "sync" - "github.com/golang/snappy" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" @@ -18,6 +14,7 @@ import ( "github.com/ledgerwatch/log/v3" "github.com/pierrec/lz4" "github.com/spf13/afero" + "golang.org/x/exp/slices" ) var lz4PoolWriterPool = sync.Pool{ @@ -45,6 +42,10 @@ const ( PreValidated ChainSegmentInsertionResult = 5 ) +type savedStateRecord struct { + slot uint64 +} + // ForkGraph is our graph for ETH 2.0 consensus forkchoice. Each node is a (block root, changes) pair and // each edge is the path described as (prevBlockRoot, currBlockRoot). if we want to go forward we use blocks. type forkGraphDisk struct { @@ -53,9 +54,18 @@ type forkGraphDisk struct { blocks map[libcommon.Hash]*cltypes.SignedBeaconBlock // set of blocks headers map[libcommon.Hash]*cltypes.BeaconBlockHeader // set of headers badBlocks map[libcommon.Hash]struct{} // blocks that are invalid and that leads to automatic fail of extension. + + // TODO: this leaks, but it isn't a big deal since it's only ~24 bytes per block. + // the dirty solution is to just make it an LRU with max size of like 128 epochs or something probably? + stateRoots map[libcommon.Hash]libcommon.Hash // set of stateHash -> blockHash + // current state data currentState *state.CachingBeaconState currentStateBlockRoot libcommon.Hash + + // saveStates are indexed by block index + saveStates map[libcommon.Hash]savedStateRecord + // for each block root we also keep track of te equivalent current justified and finalized checkpoints for faster head retrieval. currentJustifiedCheckpoints map[libcommon.Hash]solid.Checkpoint finalizedCheckpoints map[libcommon.Hash]solid.Checkpoint @@ -71,146 +81,6 @@ type forkGraphDisk struct { sszSnappyBuffer bytes.Buffer } -func getBeaconStateFilename(blockRoot libcommon.Hash) string { - return fmt.Sprintf("%x.snappy_ssz", blockRoot) -} - -func getBeaconStateCacheFilename(blockRoot libcommon.Hash) string { - return fmt.Sprintf("%x.cache", blockRoot) -} - -func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *state.CachingBeaconState, err error) { - var file afero.File - file, err = f.fs.Open(getBeaconStateFilename(blockRoot)) - - if err != nil { - return - } - defer file.Close() - // Read the version - v := []byte{0} - if _, err := file.Read(v); err != nil { - return nil, err - } - // Read the length - lengthBytes := make([]byte, 8) - _, err = file.Read(lengthBytes) - if err != nil { - return - } - // Grow the snappy buffer - f.sszSnappyBuffer.Grow(int(binary.BigEndian.Uint64(lengthBytes))) - // Read the snappy buffer - sszSnappyBuffer := f.sszSnappyBuffer.Bytes() - sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)] - var n int - n, err = file.Read(sszSnappyBuffer) - if err != nil { - return - } - - decLen, err := snappy.DecodedLen(sszSnappyBuffer[:n]) - if err != nil { - return - } - // Grow the plain ssz buffer - f.sszBuffer.Grow(decLen) - sszBuffer := f.sszBuffer.Bytes() - sszBuffer, err = snappy.Decode(sszBuffer, sszSnappyBuffer[:n]) - if err != nil { - return - } - bs = state.New(f.beaconCfg) - err = bs.DecodeSSZ(sszBuffer, int(v[0])) - // decode the cache file - cacheFile, err := f.fs.Open(getBeaconStateCacheFilename(blockRoot)) - if err != nil { - return - } - defer cacheFile.Close() - - lz4Reader := lz4PoolReaderPool.Get().(*lz4.Reader) - defer lz4PoolReaderPool.Put(lz4Reader) - - lz4Reader.Reset(cacheFile) - - if err := bs.DecodeCaches(lz4Reader); err != nil { - return nil, err - } - - return -} - -// dumpBeaconStateOnDisk dumps a beacon state on disk in ssz snappy format -func (f *forkGraphDisk) dumpBeaconStateOnDisk(bs *state.CachingBeaconState, blockRoot libcommon.Hash) (err error) { - // Truncate and then grow the buffer to the size of the state. - encodingSizeSSZ := bs.EncodingSizeSSZ() - f.sszBuffer.Grow(encodingSizeSSZ) - f.sszBuffer.Reset() - - sszBuffer := f.sszBuffer.Bytes() - sszBuffer, err = bs.EncodeSSZ(sszBuffer) - if err != nil { - return - } - // Grow the snappy buffer - f.sszSnappyBuffer.Grow(snappy.MaxEncodedLen(len(sszBuffer))) - // Compress the ssz buffer - sszSnappyBuffer := f.sszSnappyBuffer.Bytes() - sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)] - sszSnappyBuffer = snappy.Encode(sszSnappyBuffer, sszBuffer) - var dumpedFile afero.File - dumpedFile, err = f.fs.OpenFile(getBeaconStateFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755) - if err != nil { - return - } - defer dumpedFile.Close() - // First write the hard fork version - _, err = dumpedFile.Write([]byte{byte(bs.Version())}) - if err != nil { - return - } - // Second write the length - length := make([]byte, 8) - binary.BigEndian.PutUint64(length, uint64(len(sszSnappyBuffer))) - _, err = dumpedFile.Write(length) - if err != nil { - return - } - // Lastly dump the state - _, err = dumpedFile.Write(sszSnappyBuffer) - if err != nil { - return - } - - err = dumpedFile.Sync() - if err != nil { - return - } - - cacheFile, err := f.fs.OpenFile(getBeaconStateCacheFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755) - if err != nil { - return - } - defer cacheFile.Close() - - lz4Writer := lz4PoolWriterPool.Get().(*lz4.Writer) - defer lz4PoolWriterPool.Put(lz4Writer) - - lz4Writer.CompressionLevel = 5 - lz4Writer.Reset(cacheFile) - - if err := bs.EncodeCaches(lz4Writer); err != nil { - return err - } - if err = lz4Writer.Flush(); err != nil { - return - } - err = cacheFile.Sync() - - return -} - // Initialize fork graph with a new state func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs) ForkGraph { farthestExtendingPath := make(map[libcommon.Hash]bool) @@ -230,12 +100,14 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs) F f := &forkGraphDisk{ fs: aferoFs, // storage - blocks: make(map[libcommon.Hash]*cltypes.SignedBeaconBlock), - headers: headers, - badBlocks: make(map[libcommon.Hash]struct{}), + blocks: make(map[libcommon.Hash]*cltypes.SignedBeaconBlock), + headers: headers, + badBlocks: make(map[libcommon.Hash]struct{}), + stateRoots: make(map[libcommon.Hash]libcommon.Hash), // current state data currentState: anchorState, currentStateBlockRoot: anchorRoot, + saveStates: make(map[libcommon.Hash]savedStateRecord), // checkpoints trackers currentJustifiedCheckpoints: make(map[libcommon.Hash]solid.Checkpoint), finalizedCheckpoints: make(map[libcommon.Hash]solid.Checkpoint), @@ -314,10 +186,18 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, BodyRoot: bodyRoot, } + // add the state root + stateRoot, err := newState.HashSSZ() + if err != nil { + return nil, LogisticError, err + } + f.stateRoots[stateRoot] = blockRoot + if newState.Slot()%f.beaconCfg.SlotsPerEpoch == 0 { if err := f.dumpBeaconStateOnDisk(newState, blockRoot); err != nil { return nil, LogisticError, err } + f.saveStates[blockRoot] = savedStateRecord{slot: newState.Slot()} } // Lastly add checkpoints to caches as well. @@ -341,6 +221,88 @@ func (f *forkGraphDisk) getBlock(blockRoot libcommon.Hash) (*cltypes.SignedBeaco return obj, has } +// GetStateAtSlot is for getting a state based off the slot number +// NOTE: all this does is call GetStateAtSlot using the stateRoots index and existing blocks. +func (f *forkGraphDisk) GetStateAtStateRoot(root libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) { + blockRoot, ok := f.stateRoots[root] + if !ok { + return nil, ErrStateNotFound + } + blockSlot, ok := f.blocks[blockRoot] + if !ok { + return nil, ErrStateNotFound + } + return f.GetStateAtSlot(blockSlot.Block.Slot, alwaysCopy) + +} + +// GetStateAtSlot is for getting a state based off the slot number +// TODO: this is rather inefficient. we could create indices that make it faster +func (f *forkGraphDisk) GetStateAtSlot(slot uint64, alwaysCopy bool) (*state.CachingBeaconState, error) { + // fast path for if the slot is the current slot + if f.currentState.Slot() == slot { + // always copy. + if alwaysCopy { + ret, err := f.currentState.Copy() + return ret, err + } + return f.currentState, nil + } + // if the slot requested is larger than the current slot, we know it is not found, so another fast path + if slot > f.currentState.Slot() { + return nil, ErrStateNotFound + } + if len(f.saveStates) == 0 { + return nil, ErrStateNotFound + } + bestSlot := uint64(0) + startHash := libcommon.Hash{} + // iterate over all savestates. there should be less than 10 of these, so this should be safe. + for blockHash, v := range f.saveStates { + // make sure the slot is smaller than the target slot + // (equality case caught by short circuit) + // and that the slot is larger than the current best found starting slot + if v.slot < slot && v.slot > bestSlot { + bestSlot = v.slot + startHash = blockHash + } + } + // no snapshot old enough to honor this request :( + if bestSlot == 0 { + return nil, ErrStateNotFound + } + copyReferencedState, err := f.readBeaconStateFromDisk(startHash) + if err != nil { + return nil, err + } + // cache lied? return state not found + if copyReferencedState == nil { + return nil, ErrStateNotFound + } + + // what we need to do is grab every block in our block store that is between the target slot and the current slot + // this is linear time from the distance to our last snapshot. + blocksInTheWay := []*cltypes.SignedBeaconBlock{} + for _, v := range f.blocks { + if v.Block.Slot <= f.currentState.Slot() && v.Block.Slot >= slot { + blocksInTheWay = append(blocksInTheWay, v) + } + } + + // sort the slots from low to high + slices.SortStableFunc(blocksInTheWay, func(a, b *cltypes.SignedBeaconBlock) int { + return int(a.Block.Slot) - int(b.Block.Slot) + }) + + // Traverse the blocks from top to bottom. + for _, block := range blocksInTheWay { + if err := transition.TransitionState(copyReferencedState, block, false); err != nil { + return nil, err + } + } + return copyReferencedState, nil +} + func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) { if f.currentStateBlockRoot == blockRoot { if alwaysCopy { @@ -364,7 +326,6 @@ func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*st if ok && bHeader.Slot%f.beaconCfg.SlotsPerEpoch == 0 { break } - log.Debug("Could not retrieve state: Missing header", "missing", currentIteratorRoot) return nil, nil } @@ -420,6 +381,7 @@ func (f *forkGraphDisk) Prune(pruneSlot uint64) (err error) { delete(f.currentJustifiedCheckpoints, root) delete(f.finalizedCheckpoints, root) delete(f.headers, root) + delete(f.saveStates, root) f.fs.Remove(getBeaconStateFilename(root)) f.fs.Remove(getBeaconStateCacheFilename(root)) } diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go new file mode 100644 index 00000000000..e0ebf2a80f2 --- /dev/null +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go @@ -0,0 +1,153 @@ +package fork_graph + +import ( + "encoding/binary" + "fmt" + "os" + + "github.com/golang/snappy" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/phase1/core/state" + "github.com/pierrec/lz4" + "github.com/spf13/afero" +) + +func getBeaconStateFilename(blockRoot libcommon.Hash) string { + return fmt.Sprintf("%x.snappy_ssz", blockRoot) +} + +func getBeaconStateCacheFilename(blockRoot libcommon.Hash) string { + return fmt.Sprintf("%x.cache", blockRoot) +} + +func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *state.CachingBeaconState, err error) { + var file afero.File + file, err = f.fs.Open(getBeaconStateFilename(blockRoot)) + + if err != nil { + return + } + defer file.Close() + // Read the version + v := []byte{0} + if _, err := file.Read(v); err != nil { + return nil, err + } + // Read the length + lengthBytes := make([]byte, 8) + _, err = file.Read(lengthBytes) + if err != nil { + return + } + // Grow the snappy buffer + f.sszSnappyBuffer.Grow(int(binary.BigEndian.Uint64(lengthBytes))) + // Read the snappy buffer + sszSnappyBuffer := f.sszSnappyBuffer.Bytes() + sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)] + var n int + n, err = file.Read(sszSnappyBuffer) + if err != nil { + return + } + + decLen, err := snappy.DecodedLen(sszSnappyBuffer[:n]) + if err != nil { + return + } + // Grow the plain ssz buffer + f.sszBuffer.Grow(decLen) + sszBuffer := f.sszBuffer.Bytes() + sszBuffer, err = snappy.Decode(sszBuffer, sszSnappyBuffer[:n]) + if err != nil { + return + } + bs = state.New(f.beaconCfg) + err = bs.DecodeSSZ(sszBuffer, int(v[0])) + // decode the cache file + cacheFile, err := f.fs.Open(getBeaconStateCacheFilename(blockRoot)) + if err != nil { + return + } + defer cacheFile.Close() + + lz4Reader := lz4PoolReaderPool.Get().(*lz4.Reader) + defer lz4PoolReaderPool.Put(lz4Reader) + + lz4Reader.Reset(cacheFile) + + if err := bs.DecodeCaches(lz4Reader); err != nil { + return nil, err + } + + return +} + +// dumpBeaconStateOnDisk dumps a beacon state on disk in ssz snappy format +func (f *forkGraphDisk) dumpBeaconStateOnDisk(bs *state.CachingBeaconState, blockRoot libcommon.Hash) (err error) { + // Truncate and then grow the buffer to the size of the state. + encodingSizeSSZ := bs.EncodingSizeSSZ() + f.sszBuffer.Grow(encodingSizeSSZ) + f.sszBuffer.Reset() + + sszBuffer := f.sszBuffer.Bytes() + sszBuffer, err = bs.EncodeSSZ(sszBuffer) + if err != nil { + return + } + // Grow the snappy buffer + f.sszSnappyBuffer.Grow(snappy.MaxEncodedLen(len(sszBuffer))) + // Compress the ssz buffer + sszSnappyBuffer := f.sszSnappyBuffer.Bytes() + sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)] + sszSnappyBuffer = snappy.Encode(sszSnappyBuffer, sszBuffer) + var dumpedFile afero.File + dumpedFile, err = f.fs.OpenFile(getBeaconStateFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755) + if err != nil { + return + } + defer dumpedFile.Close() + // First write the hard fork version + _, err = dumpedFile.Write([]byte{byte(bs.Version())}) + if err != nil { + return + } + // Second write the length + length := make([]byte, 8) + binary.BigEndian.PutUint64(length, uint64(len(sszSnappyBuffer))) + _, err = dumpedFile.Write(length) + if err != nil { + return + } + // Lastly dump the state + _, err = dumpedFile.Write(sszSnappyBuffer) + if err != nil { + return + } + + err = dumpedFile.Sync() + if err != nil { + return + } + + cacheFile, err := f.fs.OpenFile(getBeaconStateCacheFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755) + if err != nil { + return + } + defer cacheFile.Close() + + lz4Writer := lz4PoolWriterPool.Get().(*lz4.Writer) + defer lz4PoolWriterPool.Put(lz4Writer) + + lz4Writer.CompressionLevel = 5 + lz4Writer.Reset(cacheFile) + + if err := bs.EncodeCaches(lz4Writer); err != nil { + return err + } + if err = lz4Writer.Flush(); err != nil { + return + } + err = cacheFile.Sync() + + return +} diff --git a/cl/phase1/forkchoice/fork_graph/interface.go b/cl/phase1/forkchoice/fork_graph/interface.go index 1195e74d1c5..66a2edd0e83 100644 --- a/cl/phase1/forkchoice/fork_graph/interface.go +++ b/cl/phase1/forkchoice/fork_graph/interface.go @@ -26,4 +26,8 @@ type ForkGraph interface { MarkHeaderAsInvalid(blockRoot libcommon.Hash) AnchorSlot() uint64 Prune(uint64) error + + // extra methods for validator api + GetStateAtSlot(slot uint64, alwaysCopy bool) (*state.CachingBeaconState, error) + GetStateAtStateRoot(root libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) } diff --git a/cl/phase1/forkchoice/forkchoice.go b/cl/phase1/forkchoice/forkchoice.go index 319a921a861..f846faa099e 100644 --- a/cl/phase1/forkchoice/forkchoice.go +++ b/cl/phase1/forkchoice/forkchoice.go @@ -7,6 +7,7 @@ import ( "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes/solid" "github.com/ledgerwatch/erigon/cl/freezer" + "github.com/ledgerwatch/erigon/cl/phase1/core/state" state2 "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph" @@ -180,6 +181,13 @@ func (f *ForkChoiceStore) JustifiedCheckpoint() solid.Checkpoint { return f.justifiedCheckpoint } +// FinalizedCheckpoint returns justified checkpoint +func (f *ForkChoiceStore) JustifiedSlot() uint64 { + f.mu.Lock() + defer f.mu.Unlock() + return f.computeStartSlotAtEpoch(f.justifiedCheckpoint.Epoch()) +} + // FinalizedCheckpoint returns justified checkpoint func (f *ForkChoiceStore) FinalizedCheckpoint() solid.Checkpoint { f.mu.Lock() @@ -216,11 +224,21 @@ func (f *ForkChoiceStore) AnchorSlot() uint64 { return f.forkGraph.AnchorSlot() } -func (f *ForkChoiceStore) GetFullState(blockRoot libcommon.Hash, alwaysCopy bool) (*state2.CachingBeaconState, error) { +func (f *ForkChoiceStore) GetStateAtBlockRoot(blockRoot libcommon.Hash, alwaysCopy bool) (*state2.CachingBeaconState, error) { f.mu.Lock() defer f.mu.Unlock() return f.forkGraph.GetState(blockRoot, alwaysCopy) } +func (f *ForkChoiceStore) GetStateAtStateRoot(stateRoot libcommon.Hash, alwaysCopy bool) (*state2.CachingBeaconState, error) { + f.mu.Lock() + defer f.mu.Unlock() + return f.forkGraph.GetState(stateRoot, alwaysCopy) +} +func (f *ForkChoiceStore) GetStateAtSlot(slot uint64, alwaysCopy bool) (*state.CachingBeaconState, error) { + f.mu.Lock() + defer f.mu.Unlock() + return f.forkGraph.GetStateAtSlot(slot, alwaysCopy) +} // Highest seen returns highest seen slot func (f *ForkChoiceStore) PreverifiedValidator(blockRoot libcommon.Hash) uint64 { diff --git a/cl/phase1/forkchoice/forkchoice_slot.go b/cl/phase1/forkchoice/forkchoice_slot.go new file mode 100644 index 00000000000..ef71778dcad --- /dev/null +++ b/cl/phase1/forkchoice/forkchoice_slot.go @@ -0,0 +1 @@ +package forkchoice diff --git a/cl/phase1/forkchoice/interface.go b/cl/phase1/forkchoice/interface.go index 37d72c8c961..96d34abd561 100644 --- a/cl/phase1/forkchoice/interface.go +++ b/cl/phase1/forkchoice/interface.go @@ -5,7 +5,7 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/cltypes/solid" - state2 "github.com/ledgerwatch/erigon/cl/phase1/core/state" + "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" ) @@ -24,10 +24,14 @@ type ForkChoiceStorageReader interface { GetHead() (common.Hash, uint64, error) HighestSeen() uint64 JustifiedCheckpoint() solid.Checkpoint + JustifiedSlot() uint64 ProposerBoostRoot() common.Hash - GetFullState(libcommon.Hash, bool) (*state2.CachingBeaconState, error) + GetStateAtBlockRoot(blockRoot libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) Slot() uint64 Time() uint64 + + GetStateAtSlot(slot uint64, alwaysCopy bool) (*state.CachingBeaconState, error) + GetStateAtStateRoot(root libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) } type ForkChoiceStorageWriter interface { diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index d25043754ee..07dc4a46315 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -492,7 +492,7 @@ func ConsensusClStages(ctx context.Context, } // Increment validator set - headState, err := cfg.forkChoice.GetFullState(headRoot, false) + headState, err := cfg.forkChoice.GetStateAtBlockRoot(headRoot, false) if err != nil { return err } diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index 1e264b9bce0..0255e0cca87 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration" "github.com/ledgerwatch/erigon/cl/beacon/handler" "github.com/ledgerwatch/erigon/cl/beacon/synced_data" + "github.com/ledgerwatch/erigon/cl/beacon/validatorapi" "github.com/ledgerwatch/erigon/cl/cltypes/solid" "github.com/ledgerwatch/erigon/cl/freezer" freezer2 "github.com/ledgerwatch/erigon/cl/freezer" @@ -158,7 +159,15 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi syncedDataManager := synced_data.NewSyncedDataManager(cfg.Active, beaconConfig) if cfg.Active { apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, db, forkChoice, pool, rcsn, syncedDataManager) - go beacon.ListenAndServe(apiHandler, cfg) + headApiHandler := &validatorapi.ValidatorApiHandler{ + FC: forkChoice, + BeaconChainCfg: beaconConfig, + GenesisCfg: genesisConfig, + } + go beacon.ListenAndServe(&beacon.LayeredBeaconHandler{ + ValidatorApi: headApiHandler, + ArchiveApi: apiHandler, + }, cfg) log.Info("Beacon API started", "addr", cfg.Address) } diff --git a/turbo/debug/flags.go b/turbo/debug/flags.go index 721345a72a2..1be6efa51b3 100644 --- a/turbo/debug/flags.go +++ b/turbo/debug/flags.go @@ -330,7 +330,7 @@ func readConfigAsMap(filePath string) (map[string]interface{}, error) { fileConfig := make(map[string]interface{}) - if fileExtension == ".yaml" { + if fileExtension == ".yaml" || fileExtension == ".yml" { yamlFile, err := os.ReadFile(filePath) if err != nil { return fileConfig, err @@ -349,7 +349,7 @@ func readConfigAsMap(filePath string) (map[string]interface{}, error) { return fileConfig, err } } else { - return fileConfig, errors.New("config files only accepted are .yaml and .toml") + return fileConfig, errors.New("config files only accepted are .yaml, .yml, and .toml") } return fileConfig, nil