From 1cec5f5d85fe8278faf08b937b9397973ab807e5 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Mon, 25 Mar 2024 14:35:27 +0100 Subject: [PATCH] gateway: import cached DNS, blockstore cache, and proxy blockstore --- examples/gateway/proxy/blockstore.go | 111 -------------------- examples/gateway/proxy/main.go | 14 ++- examples/gateway/proxy/main_test.go | 17 ++- examples/gateway/proxy/routing.go | 5 - examples/go.mod | 10 +- examples/go.sum | 18 ++-- gateway/blockstore_cache.go | 138 ++++++++++++++++++++++++ gateway/blockstore_proxy.go | 151 +++++++++++++++++++++++++++ gateway/dns.go | 59 +++++++++++ go.mod | 10 +- go.sum | 18 ++-- 11 files changed, 395 insertions(+), 156 deletions(-) delete mode 100644 examples/gateway/proxy/blockstore.go create mode 100644 gateway/blockstore_cache.go create mode 100644 gateway/blockstore_proxy.go diff --git a/examples/gateway/proxy/blockstore.go b/examples/gateway/proxy/blockstore.go deleted file mode 100644 index e8a498a08..000000000 --- a/examples/gateway/proxy/blockstore.go +++ /dev/null @@ -1,111 +0,0 @@ -package main - -import ( - "context" - "fmt" - "io" - "net/http" - - "github.com/ipfs/boxo/exchange" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" -) - -type proxyExchange struct { - httpClient *http.Client - gatewayURL string -} - -func newProxyExchange(gatewayURL string, client *http.Client) exchange.Interface { - if client == nil { - client = &http.Client{ - Transport: otelhttp.NewTransport(http.DefaultTransport), - } - } - - return &proxyExchange{ - gatewayURL: gatewayURL, - httpClient: client, - } -} - -func (e *proxyExchange) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error) { - urlStr := fmt.Sprintf("%s/ipfs/%s?format=raw", e.gatewayURL, c) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil) - if err != nil { - return nil, err - } - req.Header.Set("Accept", "application/vnd.ipld.raw") - resp, err := e.httpClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status from remote gateway: %s", resp.Status) - } - - rb, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - // Validate incoming blocks - // This is important since we are proxying block requests to an untrusted gateway. - nc, err := c.Prefix().Sum(rb) - if err != nil { - return nil, blocks.ErrWrongHash - } - if !nc.Equals(c) { - fmt.Printf("got %s vs %s\n", nc, c) - return nil, blocks.ErrWrongHash - } - - return blocks.NewBlockWithCid(rb, c) -} - -func (e *proxyExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - blk, err := e.fetch(ctx, c) - if err != nil { - return nil, err - } - return blk, nil -} - -func (e *proxyExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { - ch := make(chan blocks.Block) - - // Note: this implementation of GetBlocks does not make use of worker pools or parallelism - // However, production implementations generally will, and an advanced - // version of this can be found in https://github.com/ipfs/bifrost-gateway/ - go func() { - defer close(ch) - for _, c := range cids { - blk, err := e.fetch(ctx, c) - if err != nil { - return - } - select { - case ch <- blk: - case <-ctx.Done(): - return - } - } - }() - - return ch, nil -} - -func (e *proxyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { - // Note: while not required this function could be used optimistically to prevent fetching - // of data that the client has retrieved already even though a Get call is in progress. - return nil -} - -func (e *proxyExchange) Close() error { - // Note: while nothing is strictly required to happen here it would be reasonable to close - // existing connections and prevent new operations from starting. - return nil -} diff --git a/examples/gateway/proxy/main.go b/examples/gateway/proxy/main.go index 7b92ac729..c1a559e96 100644 --- a/examples/gateway/proxy/main.go +++ b/examples/gateway/proxy/main.go @@ -8,11 +8,9 @@ import ( "strconv" "github.com/ipfs/boxo/blockservice" - "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/examples/gateway/common" + "github.com/ipfs/boxo/exchange/offline" "github.com/ipfs/boxo/gateway" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" ) func main() { @@ -34,11 +32,11 @@ func main() { // Sets up a blockstore to hold the blocks we request from the gateway // Note: in a production environment you would likely want to choose a more efficient datastore implementation // as well as one that has a way of pruning storage so as not to hold data in memory indefinitely. - blockStore := blockstore.NewBlockstore(dssync.MutexWrap(datastore.NewMapDatastore())) - - // Sets up the exchange, which will proxy the block requests to the given gateway. - e := newProxyExchange(*gatewayUrlPtr, nil) - blockService := blockservice.New(blockStore, e) + blockStore, err := gateway.NewProxyBlockstore([]string{*gatewayUrlPtr}, nil) + if err != nil { + log.Fatal(err) + } + blockService := blockservice.New(blockStore, offline.Exchange(blockStore)) // Sets up the routing system, which will proxy the IPNS routing requests to the given gateway. routing := newProxyRouting(*gatewayUrlPtr, nil) diff --git a/examples/gateway/proxy/main_test.go b/examples/gateway/proxy/main_test.go index 8dd79c7ad..b51c9f742 100644 --- a/examples/gateway/proxy/main_test.go +++ b/examples/gateway/proxy/main_test.go @@ -10,13 +10,12 @@ import ( "testing" "github.com/ipfs/boxo/blockservice" - "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/examples/gateway/common" + "github.com/ipfs/boxo/exchange/offline" "github.com/ipfs/boxo/gateway" blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -24,16 +23,12 @@ const ( ) func newProxyGateway(t *testing.T, rs *httptest.Server) *httptest.Server { - blockStore := blockstore.NewBlockstore(dssync.MutexWrap(datastore.NewMapDatastore())) - exch := newProxyExchange(rs.URL, nil) - blockService := blockservice.New(blockStore, exch) + blockStore, err := gateway.NewProxyBlockstore([]string{rs.URL}, nil) + require.NoError(t, err) + blockService := blockservice.New(blockStore, offline.Exchange(blockStore)) routing := newProxyRouting(rs.URL, nil) - backend, err := gateway.NewBlocksBackend(blockService, gateway.WithValueStore(routing)) - if err != nil { - t.Error(err) - } - + require.NoError(t, err) handler := common.NewHandler(backend) ts := httptest.NewServer(handler) t.Cleanup(ts.Close) diff --git a/examples/gateway/proxy/routing.go b/examples/gateway/proxy/routing.go index f29fbb4d1..eba50be2a 100644 --- a/examples/gateway/proxy/routing.go +++ b/examples/gateway/proxy/routing.go @@ -85,11 +85,6 @@ func (ps *proxyRouting) fetch(ctx context.Context, name ipns.Name) ([]byte, erro } defer resp.Body.Close() - if err != nil { - return nil, err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("unexpected status from remote gateway: %s", resp.Status) } diff --git a/examples/go.mod b/examples/go.mod index 93f0d9202..19851a8b9 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -15,9 +15,9 @@ require ( github.com/multiformats/go-multicodec v0.9.0 github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.8.4 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 go.opentelemetry.io/contrib/propagators/autoprop v0.46.1 - go.opentelemetry.io/otel v1.21.0 + go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/sdk v1.21.0 ) @@ -125,6 +125,7 @@ require ( github.com/quic-go/quic-go v0.40.1 // indirect github.com/quic-go/webtransport-go v0.6.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect + github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 // indirect github.com/samber/lo v1.39.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb // indirect @@ -143,9 +144,10 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 // indirect go.opentelemetry.io/otel/exporters/zipkin v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.24.0 // indirect + go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.17.1 // indirect go.uber.org/fx v1.20.1 // indirect go.uber.org/mock v0.4.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index e1ba68120..ecd950afd 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -403,6 +403,8 @@ github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtD github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8= +github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= @@ -483,8 +485,8 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/contrib/propagators/autoprop v0.46.1 h1:cXTYcMjY0dsYokAuo8LbNBQxpF8VgTHdiHJJ1zlIXl4= go.opentelemetry.io/contrib/propagators/autoprop v0.46.1/go.mod h1:WZxgny1/6+j67B1s72PLJ4bGjidoWFzSmLNfJKVt2bo= go.opentelemetry.io/contrib/propagators/aws v1.21.1 h1:uQIQIDWb0gzyvon2ICnghpLAf9w7ADOCUiIiwCQgR2o= @@ -495,8 +497,8 @@ go.opentelemetry.io/contrib/propagators/jaeger v1.21.1 h1:f4beMGDKiVzg9IcX7/VuWV go.opentelemetry.io/contrib/propagators/jaeger v1.21.1/go.mod h1:U9jhkEl8d1LL+QXY7q3kneJWJugiN3kZJV2OWz3hkBY= go.opentelemetry.io/contrib/propagators/ot v1.21.1 h1:3TN5vkXjKYWp0YdMcnUEC/A+pBPvqz9V3nCS2xmcurk= go.opentelemetry.io/contrib/propagators/ot v1.21.1/go.mod h1:oy0MYCbS/b3cqUDW37wBWtlwBIsutngS++Lklpgh+fc= -go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= -go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk= @@ -507,12 +509,12 @@ go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 h1:VhlEQAPp9R1ktYf go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0/go.mod h1:kB3ufRbfU+CQ4MlUcqtW8Z7YEOBeK2DJ6CmR5rYYF3E= go.opentelemetry.io/otel/exporters/zipkin v1.21.0 h1:D+Gv6lSfrFBWmQYyxKjDd0Zuld9SRXpIrEsKZvE4DO4= go.opentelemetry.io/otel/exporters/zipkin v1.21.0/go.mod h1:83oMKR6DzmHisFOW3I+yIMGZUTjxiWaiBI8M8+TU5zE= -go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= -go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= -go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= -go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/gateway/blockstore_cache.go b/gateway/blockstore_cache.go new file mode 100644 index 000000000..581febefb --- /dev/null +++ b/gateway/blockstore_cache.go @@ -0,0 +1,138 @@ +package gateway + +import ( + "context" + "errors" + + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + + blockstore "github.com/ipfs/boxo/blockstore" + blocks "github.com/ipfs/go-block-format" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + uatomic "go.uber.org/atomic" + "go.uber.org/zap/zapcore" +) + +type cacheBlockStore struct { + cache *lru.TwoQueueCache[string, []byte] + rehash *uatomic.Bool + cacheHitsMetric prometheus.Counter + cacheRequestsMetric prometheus.Counter +} + +var _ blockstore.Blockstore = (*cacheBlockStore)(nil) + +// NewCacheBlockStore creates a new [blockstore.Blockstore] that caches blocks +// in memory using a two queue cache. It can be useful, for example, when paired +// with a proxy blockstore (see [NewProxyBlockstore]). +func NewCacheBlockStore(size int) (blockstore.Blockstore, error) { + c, err := lru.New2Q[string, []byte](size) + if err != nil { + return nil, err + } + + cacheHitsMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "http", + Name: "blockstore_cache_hit", + Help: "The number of global block cache hits.", + }) + + cacheRequestsMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "http", + Name: "blockstore_cache_requests", + Help: "The number of global block cache requests.", + }) + + err = prometheus.Register(cacheHitsMetric) + if err != nil { + return nil, err + } + + err = prometheus.Register(cacheRequestsMetric) + if err != nil { + return nil, err + } + + return &cacheBlockStore{ + cache: c, + rehash: uatomic.NewBool(false), + cacheHitsMetric: cacheHitsMetric, + cacheRequestsMetric: cacheRequestsMetric, + }, nil +} + +func (l *cacheBlockStore) DeleteBlock(ctx context.Context, c cid.Cid) error { + l.cache.Remove(string(c.Hash())) + return nil +} + +func (l *cacheBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) { + return l.cache.Contains(string(c.Hash())), nil +} + +func (l *cacheBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + l.cacheRequestsMetric.Add(1) + + blkData, found := l.cache.Get(string(c.Hash())) + if !found { + if log.Level().Enabled(zapcore.DebugLevel) { + log.Debugw("block not found in cache", "cid", c.String()) + } + return nil, format.ErrNotFound{Cid: c} + } + + // It's a HIT! + l.cacheHitsMetric.Add(1) + if log.Level().Enabled(zapcore.DebugLevel) { + log.Debugw("block found in cache", "cid", c.String()) + } + + if l.rehash.Load() { + rbcid, err := c.Prefix().Sum(blkData) + if err != nil { + return nil, err + } + + if !rbcid.Equals(c) { + return nil, blockstore.ErrHashMismatch + } + } + + return blocks.NewBlockWithCid(blkData, c) +} + +func (l *cacheBlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + blkData, found := l.cache.Get(string(c.Hash())) + if !found { + return -1, format.ErrNotFound{Cid: c} + } + + return len(blkData), nil +} + +func (l *cacheBlockStore) Put(ctx context.Context, blk blocks.Block) error { + l.cache.Add(string(blk.Cid().Hash()), blk.RawData()) + return nil +} + +func (l *cacheBlockStore) PutMany(ctx context.Context, blks []blocks.Block) error { + for _, b := range blks { + if err := l.Put(ctx, b); err != nil { + return err + } + } + return nil +} + +func (l *cacheBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, errors.New("not implemented") +} + +func (l *cacheBlockStore) HashOnRead(enabled bool) { + l.rehash.Store(enabled) +} diff --git a/gateway/blockstore_proxy.go b/gateway/blockstore_proxy.go new file mode 100644 index 000000000..61c97547d --- /dev/null +++ b/gateway/blockstore_proxy.go @@ -0,0 +1,151 @@ +package gateway + +import ( + "context" + "errors" + "fmt" + "io" + "math/rand" + "net/http" + "time" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/util" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +const getBlockTimeout = time.Second * 60 + +type proxyBlockstore struct { + httpClient *http.Client + gatewayURL []string + validate bool + rand *rand.Rand +} + +var _ blockstore.Blockstore = (*proxyBlockstore)(nil) + +// NewProxyBlockstore creates a new [blockstore.Blockstore] that is backed by one +// or more gateways that follow the [Trustless Gateway] specification. +// +// [Trustless Gateway]: https://specs.ipfs.tech/http-gateways/trustless-gateway/ +func NewProxyBlockstore(gatewayURL []string, cdns *CachedDNS) (blockstore.Blockstore, error) { + if len(gatewayURL) == 0 { + return nil, errors.New("missing gateway URLs to which to proxy") + } + + s := rand.NewSource(time.Now().Unix()) + rand := rand.New(s) + + // Transport with increased defaults than [http.Transport] such that + // retrieving multiple blocks from a single gateway concurrently is fast. + transport := &http.Transport{ + MaxIdleConns: 1000, + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + ForceAttemptHTTP2: true, + } + + if cdns != nil { + transport.DialContext = cdns.DialContext + } + + return &proxyBlockstore{ + gatewayURL: gatewayURL, + httpClient: &http.Client{ + Timeout: getBlockTimeout, + Transport: otelhttp.NewTransport(transport), + }, + // Enables block validation by default. Important since we are + // proxying block requests to untrusted gateways. + validate: true, + rand: rand, + }, nil +} + +func (ps *proxyBlockstore) fetch(ctx context.Context, c cid.Cid) (blocks.Block, error) { + urlStr := fmt.Sprintf("%s/ipfs/%s?format=raw", ps.getRandomGatewayURL(), c) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil) + if err != nil { + return nil, err + } + log.Debugw("raw fetch", "url", req.URL) + req.Header.Set("Accept", "application/vnd.ipld.raw") + resp, err := ps.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("http error from block gateway: %s", resp.Status) + } + + rb, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if ps.validate { + nc, err := c.Prefix().Sum(rb) + if err != nil { + return nil, blocks.ErrWrongHash + } + if !nc.Equals(c) { + return nil, blocks.ErrWrongHash + } + } + + return blocks.NewBlockWithCid(rb, c) +} + +func (ps *proxyBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + blk, err := ps.fetch(ctx, c) + if err != nil { + return false, err + } + return blk != nil, nil +} + +func (ps *proxyBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + blk, err := ps.fetch(ctx, c) + if err != nil { + return nil, err + } + return blk, nil +} + +func (ps *proxyBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + blk, err := ps.fetch(ctx, c) + if err != nil { + return 0, err + } + return len(blk.RawData()), nil +} + +func (ps *proxyBlockstore) HashOnRead(enabled bool) { + ps.validate = enabled +} + +func (c *proxyBlockstore) Put(context.Context, blocks.Block) error { + return util.ErrNotImplemented +} + +func (c *proxyBlockstore) PutMany(context.Context, []blocks.Block) error { + return util.ErrNotImplemented +} + +func (c *proxyBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, util.ErrNotImplemented +} + +func (c *proxyBlockstore) DeleteBlock(context.Context, cid.Cid) error { + return util.ErrNotImplemented +} + +func (ps *proxyBlockstore) getRandomGatewayURL() string { + return ps.gatewayURL[ps.rand.Intn(len(ps.gatewayURL))] +} diff --git a/gateway/dns.go b/gateway/dns.go index 376c42669..cd41a9e38 100644 --- a/gateway/dns.go +++ b/gateway/dns.go @@ -1,12 +1,16 @@ package gateway import ( + "context" "fmt" + "net" "strings" + "time" "github.com/libp2p/go-doh-resolver" dns "github.com/miekg/dns" madns "github.com/multiformats/go-multiaddr-dns" + "github.com/rs/dnscache" ) var defaultResolvers = map[string]string{ @@ -90,3 +94,58 @@ func NewDNSResolver(resolvers map[string]string, dohOpts ...doh.Option) (*madns. return madns.NewResolver(opts...) } + +// CachedDNS implements [http.Transport.DialContext], allowing to cache DNS +// requests for a specified amount of time. +type CachedDNS struct { + resolver *dnscache.Resolver + refresher *time.Ticker +} + +func NewCachedDNS(refreshInterval time.Duration) *CachedDNS { + cache := &CachedDNS{ + resolver: &dnscache.Resolver{}, + refresher: time.NewTicker(refreshInterval), + } + + // Configure DNS cache to not remove stale records to protect gateway from + // catastrophic failures like https://github.com/ipfs/bifrost-gateway/issues/34 + options := dnscache.ResolverRefreshOptions{} + options.ClearUnused = false + options.PersistOnFailure = true + + // Every refreshInterval we check for updates, but if there is + // none, or if domain disappears, we keep the last cached version + go func(cdns *CachedDNS) { + defer cdns.refresher.Stop() + for range cdns.refresher.C { + cdns.resolver.RefreshWithOptions(options) + } + }(cache) + + return cache +} +func (cdns *CachedDNS) DialContext(ctx context.Context, network string, addr string) (conn net.Conn, err error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := cdns.resolver.LookupHost(ctx, host) + if err != nil { + return nil, err + } + // Try all IPs returned by DNS + for _, ip := range ips { + var dialer net.Dialer + conn, err = dialer.DialContext(ctx, network, net.JoinHostPort(ip, port)) + if err == nil { + break + } + } + return +} + +func (cdns *CachedDNS) Close() error { + cdns.refresher.Stop() + return nil +} diff --git a/go.mod b/go.mod index b3aa064e4..a0a36f685 100644 --- a/go.mod +++ b/go.mod @@ -54,19 +54,22 @@ require ( github.com/pkg/errors v0.9.1 github.com/polydawn/refmt v0.89.0 github.com/prometheus/client_golang v1.18.0 + github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 github.com/samber/lo v1.39.0 github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.8.4 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f go.opencensus.io v0.24.0 - go.opentelemetry.io/otel v1.21.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 + go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 go.opentelemetry.io/otel/exporters/zipkin v1.21.0 go.opentelemetry.io/otel/sdk v1.21.0 - go.opentelemetry.io/otel/trace v1.21.0 + go.opentelemetry.io/otel/trace v1.24.0 + go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 golang.org/x/oauth2 v0.16.0 @@ -86,6 +89,7 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/flynn/noise v1.0.1 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/go-logr/logr v1.4.1 // indirect @@ -151,7 +155,7 @@ require ( github.com/whyrusleeping/cbor-gen v0.0.0-20240109153615-66e95c3e8a87 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/dig v1.17.1 // indirect go.uber.org/fx v1.20.1 // indirect diff --git a/go.sum b/go.sum index 7f79ac4fb..0854e7779 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v1.0.1 h1:vPp/jdQLXC6ppsXSj/pM3W1BIJ5FEHE2TulSJBpb43Y= github.com/flynn/noise v1.0.1/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= @@ -413,6 +415,8 @@ github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtD github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 h1:18kd+8ZUlt/ARXhljq+14TwAoKa61q6dX8jtwOf6DH8= +github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= @@ -494,8 +498,10 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= -go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk= @@ -506,12 +512,12 @@ go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 h1:VhlEQAPp9R1ktYf go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0/go.mod h1:kB3ufRbfU+CQ4MlUcqtW8Z7YEOBeK2DJ6CmR5rYYF3E= go.opentelemetry.io/otel/exporters/zipkin v1.21.0 h1:D+Gv6lSfrFBWmQYyxKjDd0Zuld9SRXpIrEsKZvE4DO4= go.opentelemetry.io/otel/exporters/zipkin v1.21.0/go.mod h1:83oMKR6DzmHisFOW3I+yIMGZUTjxiWaiBI8M8+TU5zE= -go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= -go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= -go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= -go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=