Skip to content

Commit

Permalink
Merge pull request #146 from attestantio/post-validators
Browse files Browse the repository at this point in the history
Use POST to obtain validator data.
  • Loading branch information
mcdee committed Jul 10, 2024
2 parents da19cf1 + 1a1e483 commit 704ceba
Show file tree
Hide file tree
Showing 3 changed files with 1,066 additions and 219 deletions.
2 changes: 0 additions & 2 deletions http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ func (s *Service) post(ctx context.Context, endpoint string, body io.Reader) (io
}

// post2 sends an HTTP post request and returns the body.
//
//nolint:unparam
func (s *Service) post2(ctx context.Context,
endpoint string,
query string,
Expand Down
235 changes: 22 additions & 213 deletions http/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package http
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"strings"

client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/api"
Expand All @@ -28,70 +28,9 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// indexChunkSizes defines the per-beacon-node size of an index chunk.
// Sizes are derived empirically.
var indexChunkSizes = map[string]int{
"default": 1024,
"lighthouse": 8192,
"lodestar": 1024,
"nimbus": 8192,
"prysm": 8192,
"teku": 8192,
}

// pubKeyChunkSizes defines the per-beacon-node size of a public key chunk.
// Sizes are derived empirically.
var pubKeyChunkSizes = map[string]int{
"default": 128,
"lighthouse": 512,
"lodestar": 128,
"nimbus": 1024,
"prysm": 1024,
"teku": 512,
}

// indexChunkSize is the maximum number of validator indices to send in each request.
func (s *Service) indexChunkSize(ctx context.Context) int {
if s.userIndexChunkSize > 0 {
return s.userIndexChunkSize
}

var nodeClient string
response, err := s.NodeClient(ctx)
if err == nil {
nodeClient = response.Data
} else {
// Use default.
nodeClient = "default"
}

if _, exists := indexChunkSizes[nodeClient]; exists {
return indexChunkSizes[nodeClient]
}

return indexChunkSizes["default"]
}

// pubKeyChunkSize is the maximum number of validator public keys to send in each request.
func (s *Service) pubKeyChunkSize(ctx context.Context) int {
if s.userPubKeyChunkSize > 0 {
return s.userPubKeyChunkSize
}

var nodeClient string
response, err := s.NodeClient(ctx)
if err == nil {
nodeClient = response.Data
} else {
// Use default.
nodeClient = "default"
}

if _, exists := pubKeyChunkSizes[nodeClient]; exists {
return pubKeyChunkSizes[nodeClient]
}

return pubKeyChunkSizes["default"]
type validatorsBody struct {
IDs []string `json:"ids,omitempty"`
Statuses []string `json:"statuses,omitempty"`
}

// Validators provides the validators, with their balance and status, for the given options.
Expand All @@ -112,58 +51,34 @@ func (s *Service) Validators(ctx context.Context,
}
span.SetAttributes(attribute.Int("validators", len(opts.Indices)+len(opts.PubKeys)))

if opts.State == "" {
return nil, errors.Join(errors.New("no state specified"), client.ErrInvalidOptions)
}
if len(opts.Indices) > 0 && len(opts.PubKeys) > 0 {
return nil, errors.Join(errors.New("cannot specify both indices and public keys"), client.ErrInvalidOptions)
}

if len(opts.Indices) == 0 && len(opts.PubKeys) == 0 {
// Request is for all validators; fetch from state.
return s.validatorsFromState(ctx, opts)
}

if !s.reducedMemoryUsage {
if len(opts.Indices) > s.indexChunkSize(ctx)*16 || len(opts.PubKeys) > s.pubKeyChunkSize(ctx)*16 {
// Request is for multiple pages of validators; fetch from state.
return s.validatorsFromState(ctx, opts)
}
}

if len(opts.Indices) > s.indexChunkSize(ctx) || len(opts.PubKeys) > s.pubKeyChunkSize(ctx) {
return s.chunkedValidators(ctx, opts)
}

endpoint := fmt.Sprintf("/eth/v1/beacon/states/%s/validators", opts.State)
query := ""
switch {
case len(opts.Indices) > 0:
ids := make([]string, len(opts.Indices))
for i := range opts.Indices {
ids[i] = fmt.Sprintf("%d", opts.Indices[i])
}
query = "id=" + strings.Join(ids, ",")
case len(opts.PubKeys) > 0:
ids := make([]string, len(opts.PubKeys))
for i := range opts.PubKeys {
ids[i] = opts.PubKeys[i].String()
}
query = "id=" + strings.Join(ids, ",")

body := &validatorsBody{
IDs: make([]string, 0),
Statuses: make([]string, 0),
}
if len(opts.ValidatorStates) > 0 {
states := make([]string, len(opts.ValidatorStates))
for i := range opts.ValidatorStates {
states[i] = opts.ValidatorStates[i].String()
}
if query == "" {
query = "states=" + strings.Join(states, ",")
} else {
query = fmt.Sprintf("%s&states=%s", query, strings.Join(states, ","))
}
for i := range opts.Indices {
body.IDs = append(body.IDs, fmt.Sprintf("%d", opts.Indices[i]))
}
for i := range opts.PubKeys {
body.IDs = append(body.IDs, opts.PubKeys[i].String())
}
for i := range opts.ValidatorStates {
body.Statuses = append(body.Statuses, opts.ValidatorStates[i].String())
}

httpResponse, err := s.get(ctx, endpoint, query, &opts.Common, false)
reqData, err := json.Marshal(body)
if err != nil {
return nil, errors.Join(errors.New("failed to marshal request data"), err)
}

httpResponse, err := s.post2(ctx, endpoint, query, &opts.Common, bytes.NewReader(reqData), ContentTypeJSON, map[string]string{})
if err != nil {
return nil, errors.Join(errors.New("failed to request validators"), err)
}
Expand Down Expand Up @@ -230,36 +145,14 @@ func (s *Service) validatorsFromState(ctx context.Context,
return nil, err
}

// Provide map of required pubkeys or indices.
indices := make(map[phase0.ValidatorIndex]struct{})
for _, index := range opts.Indices {
indices[index] = struct{}{}
}
pubkeys := make(map[phase0.BLSPubKey]struct{})
for _, pubkey := range opts.PubKeys {
pubkeys[pubkey] = struct{}{}
}
validatorStates := make(map[apiv1.ValidatorState]struct{})
for _, validatorState := range opts.ValidatorStates {
validatorStates[validatorState] = struct{}{}
}

res := make(map[phase0.ValidatorIndex]*apiv1.Validator, len(validators))
for i, validator := range validators {
if len(pubkeys) > 0 {
if _, exists := pubkeys[validator.PublicKey]; !exists {
// We want specific public keys, and this isn't one of them. Ignore.
continue
}
}

index := phase0.ValidatorIndex(i)
if len(indices) > 0 {
if _, exists := indices[index]; !exists {
// We want specific indices, and this isn't one of them. Ignore.
continue
}
}

state := apiv1.ValidatorToState(validator, &balances[i], epoch, farFutureEpoch)
if len(validatorStates) > 0 {
Expand All @@ -282,87 +175,3 @@ func (s *Service) validatorsFromState(ctx context.Context,
Metadata: stateResponse.Metadata,
}, nil
}

// chunkedValidators obtains the validators a chunk at a time.
func (s *Service) chunkedValidators(ctx context.Context,
opts *api.ValidatorsOpts,
) (
*api.Response[map[phase0.ValidatorIndex]*apiv1.Validator],
error,
) {
if len(opts.Indices) > 0 {
return s.chunkedValidatorsByIndex(ctx, opts)
}

return s.chunkedValidatorsByPubkey(ctx, opts)
}

// chunkedValidatorsByIndex obtains validators with index a chunk at a time.
func (s *Service) chunkedValidatorsByIndex(ctx context.Context,
opts *api.ValidatorsOpts,
) (
*api.Response[map[phase0.ValidatorIndex]*apiv1.Validator],
error,
) {
data := make(map[phase0.ValidatorIndex]*apiv1.Validator)
metadata := make(map[string]any)
indexChunkSize := s.indexChunkSize(ctx)
for i := 0; i < len(opts.Indices); i += indexChunkSize {
chunkStart := i
chunkEnd := i + indexChunkSize
if len(opts.Indices) < chunkEnd {
chunkEnd = len(opts.Indices)
}
chunk := opts.Indices[chunkStart:chunkEnd]
chunkRes, err := s.Validators(ctx, &api.ValidatorsOpts{State: opts.State, Indices: chunk, Common: opts.Common})
if err != nil {
return nil, errors.Join(errors.New("failed to obtain chunk"), err)
}
for k, v := range chunkRes.Data {
data[k] = v
}
for k, v := range chunkRes.Metadata {
metadata[k] = v
}
}

return &api.Response[map[phase0.ValidatorIndex]*apiv1.Validator]{
Data: data,
Metadata: metadata,
}, nil
}

// chunkedValidatorsByIndex obtains validators with public key a chunk at a time.
func (s *Service) chunkedValidatorsByPubkey(ctx context.Context,
opts *api.ValidatorsOpts,
) (
*api.Response[map[phase0.ValidatorIndex]*apiv1.Validator],
error,
) {
data := make(map[phase0.ValidatorIndex]*apiv1.Validator)
metadata := make(map[string]any)
pubkeyChunkSize := s.pubKeyChunkSize(ctx)
for i := 0; i < len(opts.PubKeys); i += pubkeyChunkSize {
chunkStart := i
chunkEnd := i + pubkeyChunkSize
if len(opts.PubKeys) < chunkEnd {
chunkEnd = len(opts.PubKeys)
}
chunk := opts.PubKeys[chunkStart:chunkEnd]
chunkRes, err := s.Validators(ctx, &api.ValidatorsOpts{State: opts.State, PubKeys: chunk, Common: opts.Common})
if err != nil {
return nil, errors.Join(errors.New("failed to obtain chunk"), err)
}
for k, v := range chunkRes.Data {
data[k] = v
}
for k, v := range chunkRes.Metadata {
metadata[k] = v
}
}

return &api.Response[map[phase0.ValidatorIndex]*apiv1.Validator]{
Data: data,
Metadata: metadata,
}, nil
}
1,048 changes: 1,044 additions & 4 deletions http/validators_test.go

Large diffs are not rendered by default.

0 comments on commit 704ceba

Please sign in to comment.