Skip to content

Commit

Permalink
feat(payload-requests): store metadata
Browse files Browse the repository at this point in the history
Store metadata for payload requests. Used for debugging and to analyze
opportunity for improvement.
  • Loading branch information
alextes committed May 1, 2024
1 parent 1f38bb9 commit 5108978
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 23 deletions.
13 changes: 8 additions & 5 deletions cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func init() {
apiCmd.Flags().StringSliceVar(&beaconNodeURIs, "beacon-uris", defaultBeaconURIs, "beacon endpoints")
apiCmd.Flags().StringVar(&redisURI, "redis-uri", defaultRedisURI, "redis uri")
apiCmd.Flags().StringVar(&redisReadonlyURI, "redis-readonly-uri", defaultRedisReadonlyURI, "redis readonly uri")
apiCmd.Flags().StringVar(&redisArchiveURI, "redis-archive-uri", defaultRedisArchiveURI, "redis block submission archive uri")
apiCmd.Flags().StringVar(&postgresDSN, "db", defaultPostgresDSN, "PostgreSQL DSN")
apiCmd.Flags().StringSliceVar(&memcachedURIs, "memcached-uris", defaultMemcachedURIs,
"Enable memcached, typically used as secondary backup to Redis for redundancy")
Expand Down Expand Up @@ -107,12 +108,14 @@ var apiCmd = &cobra.Command{
beaconClient := beaconclient.NewMultiBeaconClient(log, beaconInstances)

// Connect to Redis
if redisReadonlyURI == "" {
log.Infof("Connecting to Redis at %s ...", redisURI)
} else {
log.Infof("Connecting to Redis at %s / readonly: %s ...", redisURI, redisReadonlyURI)
log.Infof("Connecting to Redis at %s ...", redisURI)
if redisReadonlyURI != "" {
log.Infof("Connecting to readonly Redis at %s ...", redisReadonlyURI)
}
if redisArchiveURI != "" {
log.Infof("Connecting to block submission archive Redis at %s ...", redisArchiveURI)
}
redis, err := datastore.NewRedisCache(networkInfo.Name, redisURI, redisReadonlyURI)
redis, err := datastore.NewRedisCache(networkInfo.Name, redisURI, redisReadonlyURI, redisArchiveURI)
if err != nil {
log.WithError(err).Fatalf("Failed to connect to Redis at %s", redisURI)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/housekeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var housekeeperCmd = &cobra.Command{
beaconClient := beaconclient.NewMultiBeaconClient(log, beaconInstances)

// Connect to Redis and setup the datastore
redis, err := datastore.NewRedisCache(networkInfo.Name, redisURI, "")
redis, err := datastore.NewRedisCache(networkInfo.Name, redisURI, "", "")
if err != nil {
log.WithError(err).Fatalf("Failed to connect to Redis at %s", redisURI)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var (
defaultBeaconURIs = common.GetSliceEnv("BEACON_URIS", []string{"http://localhost:3500"})
defaultRedisURI = common.GetEnv("REDIS_URI", "localhost:6379")
defaultRedisReadonlyURI = common.GetEnv("REDIS_READONLY_URI", "")
defaultRedisArchiveURI = common.GetEnv("REDIS_ARCHIVE_URI", "")
defaultPostgresDSN = common.GetEnv("POSTGRES_DSN", "")
defaultMemcachedURIs = common.GetSliceEnv("MEMCACHED_URIS", nil)
defaultLogJSON = os.Getenv("LOG_JSON") != ""
Expand All @@ -19,6 +20,7 @@ var (
beaconNodeURIs []string
redisURI string
redisReadonlyURI string
redisArchiveURI string
postgresDSN string
memcachedURIs []string

Expand Down
12 changes: 7 additions & 5 deletions cmd/website.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ var websiteCmd = &cobra.Command{
log.Debug(networkInfo.String())

// Connect to Redis
if redisReadonlyURI == "" {
log.Infof("Connecting to Redis at %s ...", redisURI)
} else {
log.Infof("Connecting to Redis at %s / readonly: %s ...", redisURI, redisReadonlyURI)
log.Infof("Connecting to Redis at %s ...", redisURI)
if redisReadonlyURI != "" {
log.Infof("Connecting to readonly Redis at %s ...", redisReadonlyURI)
}
if redisArchiveURI != "" {
log.Infof("Connecting to block submission archive Redis at %s ...", redisArchiveURI)
}
redis, err := datastore.NewRedisCache(networkInfo.Name, redisURI, redisReadonlyURI)
redis, err := datastore.NewRedisCache(networkInfo.Name, redisURI, redisReadonlyURI, redisArchiveURI)
if err != nil {
log.WithError(err).Fatalf("Failed to connect to Redis at %s", redisURI)
}
Expand Down
2 changes: 1 addition & 1 deletion datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func setupTestDatastore(t *testing.T, mockDB *database.MockDB) *Datastore {
redisTestServer, err := miniredis.Run()
require.NoError(t, err)

redisDs, err := NewRedisCache("", redisTestServer.Addr(), "")
redisDs, err := NewRedisCache("", redisTestServer.Addr(), "", "")
require.NoError(t, err)

ds, err := NewDatastore(redisDs, nil, mockDB)
Expand Down
27 changes: 26 additions & 1 deletion datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func connectRedis(redisURI string) (*redis.Client, error) {
}

type RedisCache struct {
archiveClient *redis.Client
client *redis.Client
readonlyClient *redis.Client

Expand All @@ -103,7 +104,7 @@ type RedisCache struct {
keyLastHashDelivered string
}

func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
func NewRedisCache(prefix, redisURI, readonlyURI, archiveURI string) (*RedisCache, error) {
client, err := connectRedis(redisURI)
if err != nil {
return nil, err
Expand All @@ -117,7 +118,19 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
}
}

// By default we use the same client for block submission archiving, unless
// a different URI is provided. If latency is a concern, running a separate
// client connected to a separate instance is advisable.
archiveClient := client
if archiveURI != "" {
archiveClient, err = connectRedis(archiveURI)
if err != nil {
return nil, err
}
}

return &RedisCache{
archiveClient: archiveClient,
client: client,
readonlyClient: roClient,

Expand Down Expand Up @@ -807,3 +820,15 @@ func (r *RedisCache) NewPipeline() redis.Pipeliner { //nolint:ireturn,nolintlint
func (r *RedisCache) NewTxPipeline() redis.Pipeliner { //nolint:ireturn
return r.client.TxPipeline()
}

func (r *RedisCache) ArchivePayloadRequest(payload []interface{}) error {
return r.archiveClient.XAdd(context.Background(), &redis.XAddArgs{
// We expect payload request logs to be at most 10 KiB in size. This
// means we can expect the stream to eventually take up
// 10_000 * 10 KiB = 100 MiB.
MaxLen: 10_000,
Approx: true,
Stream: "payload-request-archive",
Values: payload,
}).Err()
}
12 changes: 6 additions & 6 deletions datastore/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func setupTestRedis(t *testing.T) *RedisCache {

redisTestServer, err := miniredis.Run()
require.NoError(t, err)
redisService, err := NewRedisCache("", redisTestServer.Addr(), "")
redisService, err := NewRedisCache("", redisTestServer.Addr(), "", "")
// redisService, err := NewRedisCache("", "localhost:6379", "")
require.NoError(t, err)

Expand Down Expand Up @@ -262,25 +262,25 @@ func TestRedisURIs(t *testing.T) {
require.NoError(t, err)

// test connection with and without protocol
_, err = NewRedisCache("", redisTestServer.Addr(), "")
_, err = NewRedisCache("", redisTestServer.Addr(), "", "")
require.NoError(t, err)
_, err = NewRedisCache("", "redis://"+redisTestServer.Addr(), "")
_, err = NewRedisCache("", "redis://"+redisTestServer.Addr(), "", "")
require.NoError(t, err)

// test connection w/ credentials
username := "user"
password := "pass"
redisTestServer.RequireUserAuth(username, password)
fullURL := "redis://" + username + ":" + password + "@" + redisTestServer.Addr()
_, err = NewRedisCache("", fullURL, "")
_, err = NewRedisCache("", fullURL, "", "")
require.NoError(t, err)

// ensure malformed URL throws error
malformURL := "http://" + username + ":" + password + "@" + redisTestServer.Addr()
_, err = NewRedisCache("", malformURL, "")
_, err = NewRedisCache("", malformURL, "", "")
require.Error(t, err)
malformURL = "redis://" + username + ":" + "wrongpass" + "@" + redisTestServer.Addr()
_, err = NewRedisCache("", malformURL, "")
_, err = NewRedisCache("", malformURL, "", "")
require.Error(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion services/api/optimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func startTestBackend(t *testing.T) (*phase0.BLSPubKey, *bls.SecretKey, *testBac
}
redisTestServer, err := miniredis.Run()
require.NoError(t, err)
mockRedis, err := datastore.NewRedisCache("", redisTestServer.Addr(), "")
mockRedis, err := datastore.NewRedisCache("", redisTestServer.Addr(), "", "")
require.NoError(t, err)
mockDS, err := datastore.NewDatastore(mockRedis, nil, mockDB)
require.NoError(t, err)
Expand Down
83 changes: 81 additions & 2 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,19 @@ func (api *RelayAPI) handleStatus(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}

// Retrieve the client IP based on the headers if any.
func getClientIP(req *http.Request) string {
// Ordered list of headers that might contain the client IP.
ip := req.Header.Get("Cf-Connecting-Ip")

// If the IP is empty, try the next header.
if ip == "" {
ip = req.Header.Get("X-Real-Ip")
}

return ip
}

// ---------------
// PROPOSER APIS
// ---------------
Expand Down Expand Up @@ -1442,6 +1455,68 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
}
}()

// Export metadata at end of request
abortReason := ""
timeBeforePublish := int64(0)
timeAfterPublish := int64(0)

defer func() {
archivePayloadLog := []interface{}{
"content_length", strconv.FormatInt(req.ContentLength, 10),
"decoded_at", decodeTime.UnixMilli(),
"finished_at", fmt.Sprint(time.Now().UTC().UnixMilli()),
"head_slot", strconv.FormatUint(headSlot, 10),
"proposer_pubkey", proposerPubkey.String(),
"received_at", receivedAt.UnixMilli(),
}

if ua != "" {
archivePayloadLog = append(archivePayloadLog, "user_agent", ua)
}

ip := getClientIP(req)

if ip != "" {
archivePayloadLog = append(archivePayloadLog, "ip", ip)
}

jsonPayload, err := json.Marshal(payload)
if err != nil {
log.WithError(err).Error("could not marshal payload")
} else {
archivePayloadLog = append(archivePayloadLog, "payload", string(jsonPayload))
}

if abortReason != "" {
archivePayloadLog = append(archivePayloadLog, "abort_reason", abortReason)
}

reqIDParam := req.URL.Query().Get("id")
if reqIDParam != "" {
archivePayloadLog = append(archivePayloadLog, "req_id_param", reqIDParam)
}

if timeBeforePublish != 0 {
archivePayloadLog = append(archivePayloadLog, "time_before_publish", timeBeforePublish)
}

if timeAfterPublish != 0 {
archivePayloadLog = append(archivePayloadLog, "time_after_publish", timeAfterPublish)
}

ipAddress := getClientIP(req)
if ipAddress != "" {
archivePayloadLog = append(archivePayloadLog, "ip_address", ipAddress)
}

err = api.redis.ArchivePayloadRequest(archivePayloadLog)
if err != nil {
log.WithError(err).Error("failed to archive payload request")
}

log.Debug(fmt.Sprintf("successfully archived payload request, block_hash: %s", blockHash.String()))
}()

// Get the response - from Redis, Memcache or DB
// note that recent mev-boost versions only send getPayload to relays that provided the bid, older versions send getPayload to all relays.
// Additionally, proposers may feel it's safer to ask for a bid from all relays and fork.
Expand Down Expand Up @@ -1473,22 +1548,26 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
if !api.ffDisablePayloadDBStorage {
_, err := api.db.GetBlockSubmissionEntry(uint64(slot), proposerPubkey.String(), blockHash.String())
if errors.Is(err, sql.ErrNoRows) {
abortReason = "execution-payload-not-found"
log.Info("failed second attempt to get execution payload, discovered block was never submitted to this relay")
api.RespondError(w, http.StatusBadRequest, "no execution payload for this request, block was never seen by this relay")
return
}
if err != nil {
abortReason = "execution-payload-retrieval-error"
log.WithError(err).Error("failed second attempt to get execution payload, hit an error while checking if block was submitted to this relay")
api.RespondError(w, http.StatusInternalServerError, "no execution payload for this request, hit an error while checking if block was submitted to this relay")
return
}
}

// Case 2 or 3, we don't know which.
abortReason = "execution-payload-not-found"
log.Warn("failed second attempt to get execution payload, not found case, block was never submitted to this relay or bid was accepted but payload was lost")
api.RespondError(w, http.StatusBadRequest, "no execution payload for this request, block was never seen by this relay or bid was accepted but payload was lost, if you got this bid from us, please contact the relay")
return
} else {
abortReason = "execution-payload-retrieval-error"
log.WithError(err).Error("failed second attempt to get execution payload, error case")
api.RespondError(w, http.StatusInternalServerError, "no execution payload for this request")
return
Expand Down Expand Up @@ -1557,7 +1636,7 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
}

// Publish the signed beacon block via beacon-node
timeBeforePublish := time.Now().UTC().UnixMilli()
timeBeforePublish = time.Now().UTC().UnixMilli()
log = log.WithField("timestampBeforePublishing", timeBeforePublish)
signedBeaconBlock, err := common.SignedBlindedBeaconBlockToBeaconBlock(payload, getPayloadResp)
if err != nil {
Expand All @@ -1571,7 +1650,7 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
api.RespondError(w, http.StatusBadRequest, "failed to publish block")
return
}
timeAfterPublish := time.Now().UTC().UnixMilli()
timeAfterPublish = time.Now().UTC().UnixMilli()
msNeededForPublishing = uint64(timeAfterPublish - timeBeforePublish)
log = log.WithField("timestampAfterPublishing", timeAfterPublish)
log.WithField("msNeededForPublishing", msNeededForPublishing).Info("block published through beacon node")
Expand Down
2 changes: 1 addition & 1 deletion services/api/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newTestBackend(t require.TestingT, numBeaconNodes int) *testBackend {
redisClient, err := miniredis.Run()
require.NoError(t, err)

redisCache, err := datastore.NewRedisCache("", redisClient.Addr(), "")
redisCache, err := datastore.NewRedisCache("", redisClient.Addr(), "", "")
require.NoError(t, err)

db := database.MockDB{}
Expand Down

0 comments on commit 5108978

Please sign in to comment.