Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add short-lived cache handlers #106

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions proxyd/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
)

type Cache interface {
// Get fetches a value from the cache
Get(ctx context.Context, key string) (string, error)
Put(ctx context.Context, key string, value string) error

// Put updates a value in the cache with a TTL
Put(ctx context.Context, key string, value string, shortLived bool) error
}

const (
Expand All @@ -39,7 +42,11 @@ func (c *cache) Get(ctx context.Context, key string) (string, error) {
return "", nil
}

func (c *cache) Put(ctx context.Context, key string, value string) error {
func (c *cache) Put(ctx context.Context, key string, value string, shortLived bool) error {
// ignore value with short lived flag
if shortLived {
return nil
}
c.lru.Add(key, value)
return nil
}
Expand All @@ -61,10 +68,10 @@ func (c *fallbackCache) Get(ctx context.Context, key string) (string, error) {
return val, nil
}

func (c *fallbackCache) Put(ctx context.Context, key string, value string) error {
err := c.primaryCache.Put(ctx, key, value)
func (c *fallbackCache) Put(ctx context.Context, key string, value string, shortLived bool) error {
err := c.primaryCache.Put(ctx, key, value, shortLived)
if err != nil {
return c.secondaryCache.Put(ctx, key, value)
return c.secondaryCache.Put(ctx, key, value, shortLived)
}
return nil
}
Expand All @@ -73,11 +80,12 @@ type redisCache struct {
redisClient redis.UniversalClient
redisReadClient redis.UniversalClient
prefix string
ttl time.Duration
defaultTTL time.Duration
shortLivedTTL time.Duration
}

func newRedisCache(redisClient redis.UniversalClient, redisReadClient redis.UniversalClient, prefix string, ttl time.Duration) *redisCache {
return &redisCache{redisClient, redisReadClient, prefix, ttl}
func newRedisCache(redisClient redis.UniversalClient, redisReadClient redis.UniversalClient, prefix string, defaultTTL time.Duration, shortLivedTTL time.Duration) *redisCache {
return &redisCache{redisClient, redisReadClient, prefix, defaultTTL, shortLivedTTL}
}

func (c *redisCache) namespaced(key string) string {
Expand All @@ -101,9 +109,19 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
return val, nil
}

func (c *redisCache) Put(ctx context.Context, key string, value string) error {
func (c *redisCache) Put(ctx context.Context, key string, value string, shortLived bool) error {
ttl := c.defaultTTL

// disable PUT on short lived key if shortLivedTTL is not set
if shortLived {
if c.shortLivedTTL == 0 {
return nil
}
ttl = c.shortLivedTTL
}

start := time.Now()
err := c.redisClient.SetEx(ctx, c.namespaced(key), value, c.ttl).Err()
err := c.redisClient.SetEx(ctx, c.namespaced(key), value, ttl).Err()
redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds()))

if err != nil {
Expand Down Expand Up @@ -135,9 +153,9 @@ func (c *cacheWithCompression) Get(ctx context.Context, key string) (string, err
return string(val), nil
}

func (c *cacheWithCompression) Put(ctx context.Context, key string, value string) error {
func (c *cacheWithCompression) Put(ctx context.Context, key string, value string, shortLived bool) error {
encodedVal := snappy.Encode(nil, []byte(value))
return c.cache.Put(ctx, key, string(encodedVal))
return c.cache.Put(ctx, key, string(encodedVal), shortLived)
}

type RPCCache interface {
Expand All @@ -152,6 +170,7 @@ type rpcCache struct {

func newRPCCache(cache Cache) RPCCache {
staticHandler := &StaticMethodHandler{cache: cache}
shortLivedHandler := &StaticMethodHandler{cache: cache, shortLived: true}
debugGetRawReceiptsHandler := &StaticMethodHandler{cache: cache,
filterGet: func(req *RPCReq) bool {
// cache only if the request is for a block hash
Expand All @@ -176,14 +195,24 @@ func newRPCCache(cache Cache) RPCCache {
},
}
handlers := map[string]RPCMethodHandler{
"eth_chainId": staticHandler,
"net_version": staticHandler,
"eth_getBlockTransactionCountByHash": staticHandler,
"eth_getUncleCountByBlockHash": staticHandler,
"eth_getBlockByHash": staticHandler,
"eth_getTransactionByBlockHashAndIndex": staticHandler,
"eth_getUncleByBlockHashAndIndex": staticHandler,
"debug_getRawReceipts": debugGetRawReceiptsHandler,
"eth_chainId": staticHandler,
"net_version": staticHandler,
"eth_getBlockTransactionCountByHash": staticHandler,
"eth_getUncleCountByBlockHash": staticHandler,
"eth_getBlockByHash": staticHandler,
"eth_getTransactionByBlockHashAndIndex": staticHandler,
"eth_getUncleByBlockHashAndIndex": staticHandler,
"debug_getRawReceipts": debugGetRawReceiptsHandler,
"eth_getBlockByNumber": shortLivedHandler,
"eth_blockNumber": shortLivedHandler,
"eth_getBalance": shortLivedHandler,
"eth_getStorageAt": shortLivedHandler,
"eth_getTransactionCount": shortLivedHandler,
"eth_getBlockTransactionCountByNumber": shortLivedHandler,
"eth_getUncleCountByBlockNumber": shortLivedHandler,
"eth_getCode": shortLivedHandler,
"eth_getTransactionByBlockNumberAndIndex": shortLivedHandler,
"eth_getUncleByBlockNumberAndIndex": shortLivedHandler,
Comment on lines +206 to +215
Copy link
Contributor

@jelias2 jelias2 Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all of these be low TTL? ex. eth_getCode, eth_getBlockByNumber or eth_getBlockTransactionCountByNumber seem like they would return consistent results ?

Just curious how these RPCs were chosen, any related data? I get that tip relevant RPCs and rapidly changing data fit the low TTL behavior

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up could be making this configurable but I think that maybe out of scope for the inital PR

}
return &rpcCache{
cache: cache,
Expand Down
6 changes: 3 additions & 3 deletions proxyd/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *errorCache) Get(ctx context.Context, key string) (string, error) {
return "", errors.New("test error")
}

func (c *errorCache) Put(ctx context.Context, key string, value string) error {
func (c *errorCache) Put(ctx context.Context, key string, value string, shortLived bool) error {
return errors.New("test error")
}

Expand All @@ -241,7 +241,7 @@ func TestFallbackCache(t *testing.T) {

for i, cache := range success {
t.Run("success", func(t *testing.T) {
err := cache.Put(ctx, "foo", fmt.Sprintf("bar%d", i))
err := cache.Put(ctx, "foo", fmt.Sprintf("bar%d", i), false)
require.NoError(t, err)

val, err := cache.Get(ctx, "foo")
Expand All @@ -256,7 +256,7 @@ func TestFallbackCache(t *testing.T) {
require.Error(t, err)
require.Empty(t, val)

err = cache.Put(ctx, "foo", "baz")
err = cache.Put(ctx, "foo", "baz", false)
require.Error(t, err)
})
}
Expand Down
5 changes: 3 additions & 2 deletions proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type ServerConfig struct {
}

type CacheConfig struct {
Enabled bool `toml:"enabled"`
TTL TOMLDuration `toml:"ttl"`
Enabled bool `toml:"enabled"`
TTL TOMLDuration `toml:"ttl"`
ShortLivedTTL TOMLDuration `toml:"short_lived_ttl"`
}

type RedisConfig struct {
Expand Down
49 changes: 20 additions & 29 deletions proxyd/integration_tests/caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ func TestCaching(t *testing.T) {
hdlr.SetRoute("eth_getTransactionReceipt", "999", "eth_getTransactionReceipt")
hdlr.SetRoute("debug_getRawReceipts", "999", "debug_getRawReceipts")
/* not cacheable */
hdlr.SetRoute("eth_getBlockByNumber", "999", "eth_getBlockByNumber")
hdlr.SetRoute("eth_blockNumber", "999", "eth_blockNumber")
hdlr.SetRoute("eth_call", "999", "eth_call")

backend := NewMockBackend(hdlr)
Expand Down Expand Up @@ -91,27 +89,6 @@ func TestCaching(t *testing.T) {
1,
},
/* not cacheable */
{
"eth_getBlockByNumber",
[]interface{}{
"0x1",
true,
},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByNumber\", \"id\": 999}",
2,
},
{
"eth_getTransactionReceipt",
[]interface{}{"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionReceipt\", \"id\": 999}",
2,
},
{
"eth_getTransactionByHash",
[]interface{}{"0x88df016429689c079f3b2f6ad39fa052532c56795b733da78a91ebe6a713944b"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionByHash\", \"id\": 999}",
2,
},
{
"eth_call",
[]interface{}{
Expand All @@ -125,12 +102,6 @@ func TestCaching(t *testing.T) {
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_call\", \"id\": 999}",
2,
},
{
"eth_blockNumber",
nil,
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_blockNumber\", \"id\": 999}",
2,
},
{
"eth_call",
[]interface{}{
Expand Down Expand Up @@ -171,6 +142,26 @@ func TestCaching(t *testing.T) {
})
}

t.Run("eth_getBlockByNumber should only be cached for 3 seconds", func(t *testing.T) {
backend.Reset()
hdlr.SetRoute("eth_getBlockByNumber", "999", "eth_getBlockByNumber")

resRaw, _, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x123", "false"})
require.NoError(t, err)
resCache, _, err := client.SendRPC("eth_getBlockByNumber", []interface{}{"0x123", "false"})
require.NoError(t, err)
RequireEqualJSON(t, []byte("{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"eth_getBlockByNumber\"}"), resRaw)
RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 1, countRequests(backend, "eth_getBlockByNumber"))

// fast forward 4 seconds and make the same request
redis.FastForward(4 * time.Second)
resCache, _, err = client.SendRPC("eth_getBlockByNumber", []interface{}{"0x123", "false"})
require.NoError(t, err)
require.Equal(t, 2, countRequests(backend, "eth_getBlockByNumber"))
RequireEqualJSON(t, []byte("{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":\"eth_getBlockByNumber\"}"), resCache)
})

t.Run("nil responses should not be cached", func(t *testing.T) {
hdlr.SetRoute("eth_getBlockByHash", "999", nil)
resRaw, _, err := client.SendRPC("eth_getBlockByHash", []interface{}{"0x123"})
Expand Down
1 change: 1 addition & 0 deletions proxyd/integration_tests/testdata/caching.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace = "proxyd"

[cache]
enabled = true
short_lived_ttl = "1s"

[backends]
[backends.good]
Expand Down
11 changes: 6 additions & 5 deletions proxyd/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ type RPCMethodHandler interface {
}

type StaticMethodHandler struct {
cache Cache
m sync.RWMutex
filterGet func(*RPCReq) bool
filterPut func(*RPCReq, *RPCRes) bool
cache Cache
m sync.RWMutex
shortLived bool
filterGet func(*RPCReq) bool
filterPut func(*RPCReq, *RPCRes) bool
}

func (e *StaticMethodHandler) key(req *RPCReq) string {
Expand Down Expand Up @@ -83,7 +84,7 @@ func (e *StaticMethodHandler) PutRPCMethod(ctx context.Context, req *RPCReq, res
key := e.key(req)
value := mustMarshalJSON(res.Result)

err := e.cache.Put(ctx, key, string(value))
err := e.cache.Put(ctx, key, string(value), e.shortLived)
if err != nil {
log.Error("error putting into cache", "key", key, "method", req.Method, "err", err)
return err
Expand Down
3 changes: 2 additions & 1 deletion proxyd/proxyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ func Start(config *Config) (*Server, func(), error) {
if config.Cache.TTL != 0 {
ttl = time.Duration(config.Cache.TTL)
}
cache = newRedisCache(redisClient, redisReadClient, config.Redis.Namespace, ttl)

cache = newRedisCache(redisClient, redisReadClient, config.Redis.Namespace, ttl, time.Duration(config.Cache.ShortLivedTTL))

if config.Redis.FallbackToMemory {
cache = newFallbackCache(cache, newMemoryCache())
Expand Down
1 change: 1 addition & 0 deletions proxyd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
defaultWSReadTimeout = 2 * time.Minute
defaultWSWriteTimeout = 10 * time.Second
defaultCacheTtl = 1 * time.Hour
defaultBlockTtl = 2 * time.Second
maxRequestBodyLogLen = 2000
defaultMaxUpstreamBatchSize = 10
defaultRateLimitHeader = "X-Forwarded-For"
Expand Down