Skip to content

Commit

Permalink
feat(gateway): add TAR, IPNS Record, DAG-* histograms and spans (#155)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcin Rataj <lidel@lidel.org>
  • Loading branch information
hacdias and lidel authored Feb 8, 2023
1 parent 1a932f7 commit 378aaf7
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 65 deletions.
18 changes: 17 additions & 1 deletion examples/gateway/car/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/ipfs/go-libipfs/examples/gateway/common"
"github.com/ipfs/go-libipfs/gateway"
carblockstore "github.com/ipld/go-car/v2/blockstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
Expand Down Expand Up @@ -52,9 +54,23 @@ func main() {
UseSubdomains: true,
},
}
handler = gateway.WithHostname(handler, gwAPI, publicGateways, noDNSLink)

// Creates a mux to serve the prometheus metrics alongside the gateway. This
// step is optional and only required if you need or want to access the metrics.
// You may also decide to expose the metrics on a different path, or port.
mux := http.NewServeMux()
mux.Handle("/debug/metrics/prometheus", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}))
mux.Handle("/", handler)

// Then wrap the mux with the hostname handler. Please note that the metrics
// will not be available under the previously defined publicGateways.
// You will be able to access the metrics via 127.0.0.1 but not localhost
// or example.net. If you want to expose the metrics on such gateways,
// you will have to add the path "/debug" to the variable Paths.
handler = gateway.WithHostname(mux, gwAPI, publicGateways, noDNSLink)

log.Printf("Listening on http://localhost:%d", *port)
log.Printf("Metrics available at http://127.0.0.1:%d/debug/metrics/prometheus", *port)
for _, cid := range roots {
log.Printf("Hosting CAR root at http://localhost:%d/ipfs/%s", *port, cid.String())
}
Expand Down
20 changes: 19 additions & 1 deletion examples/gateway/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-libipfs/examples/gateway/common"
"github.com/ipfs/go-libipfs/gateway"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
Expand Down Expand Up @@ -50,9 +52,25 @@ func main() {
UseSubdomains: true,
},
}
handler = gateway.WithHostname(handler, gwAPI, publicGateways, noDNSLink)

// Creates a mux to serve the prometheus metrics alongside the gateway. This
// step is optional and only required if you need or want to access the metrics.
// You may also decide to expose the metrics on a different path, or port.
mux := http.NewServeMux()
mux.Handle("/debug/metrics/prometheus", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}))
mux.Handle("/", handler)

// Then wrap the mux with the hostname handler. Please note that the metrics
// will not be available under the previously defined publicGateways.
// You will be able to access the metrics via 127.0.0.1 but not localhost
// or example.net. If you want to expose the metrics on such gateways,
// you will have to add the path "/debug" to the variable Paths.
handler = gateway.WithHostname(mux, gwAPI, publicGateways, noDNSLink)

log.Printf("Listening on http://localhost:%d", *port)
log.Printf("Try loading an image: http://localhost:%d/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", *port)
log.Printf("Try browsing Wikipedia snapshot: http://localhost:%d/ipfs/bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze", *port)
log.Printf("Metrics available at http://127.0.0.1:%d/debug/metrics/prometheus", *port)
if err := http.ListenAndServe(":"+strconv.Itoa(*port), handler); err != nil {
log.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion examples/gateway/proxy/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (ps *proxyRouting) fetch(ctx context.Context, id peer.ID) ([]byte, error) {
},
})
if err != nil {
fmt.Println(err)
return nil, err
}
defer resp.Body.Close()
Expand Down
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/ipld/go-codec-dagpb v1.5.0
github.com/ipld/go-ipld-prime v0.19.0
github.com/libp2p/go-libp2p v0.24.2
github.com/prometheus/client_golang v1.14.0
github.com/stretchr/testify v1.8.1
)

Expand Down Expand Up @@ -89,7 +90,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
Expand Down
59 changes: 42 additions & 17 deletions gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ type handler struct {
unixfsGetMetric *prometheus.SummaryVec // deprecated, use firstContentBlockGetMetric

// response type metrics
unixfsFileGetMetric *prometheus.HistogramVec
unixfsGenDirGetMetric *prometheus.HistogramVec
carStreamGetMetric *prometheus.HistogramVec
rawBlockGetMetric *prometheus.HistogramVec
getMetric *prometheus.HistogramVec
unixfsFileGetMetric *prometheus.HistogramVec
unixfsGenDirGetMetric *prometheus.HistogramVec
carStreamGetMetric *prometheus.HistogramVec
rawBlockGetMetric *prometheus.HistogramVec
tarStreamGetMetric *prometheus.HistogramVec
jsoncborDocumentGetMetric *prometheus.HistogramVec
ipnsRecordGetMetric *prometheus.HistogramVec
}

// StatusResponseWriter enables us to override HTTP Status Code passed to
Expand Down Expand Up @@ -232,6 +236,11 @@ func newHandler(c Config, api API) *handler {

// Response-type specific metrics
// ----------------------------
// Generic: time it takes to execute a successful gateway request (all request types)
getMetric: newHistogramMetric(
"gw_get_duration_seconds",
"The time to GET a successful response to a request (all content types).",
),
// UnixFS: time it takes to return a file
unixfsFileGetMetric: newHistogramMetric(
"gw_unixfs_file_get_duration_seconds",
Expand All @@ -252,13 +261,28 @@ func newHandler(c Config, api API) *handler {
"gw_raw_block_get_duration_seconds",
"The time to GET an entire raw Block from the gateway.",
),
// TAR: time it takes to return requested TAR stream
tarStreamGetMetric: newHistogramMetric(
"gw_tar_stream_get_duration_seconds",
"The time to GET an entire TAR stream from the gateway.",
),
// JSON/CBOR: time it takes to return requested DAG-JSON/-CBOR document
jsoncborDocumentGetMetric: newHistogramMetric(
"gw_jsoncbor_get_duration_seconds",
"The time to GET an entire DAG-JSON/CBOR block from the gateway.",
),
// IPNS Record: time it takes to return IPNS record
ipnsRecordGetMetric: newHistogramMetric(
"gw_ipns_record_get_duration_seconds",
"The time to GET an entire IPNS Record from the gateway.",
),

// Legacy Metrics
// ----------------------------
unixfsGetMetric: newSummaryMetric( // TODO: remove?
// (deprecated, use firstContentBlockGetMetric instead)
"unixfs_get_latency_seconds",
"The time to receive the first UnixFS node on a GET from the gateway.",
"DEPRECATED: does not do what you think, use gw_first_content_block_get_latency_seconds instead.",
),
}
return i
Expand Down Expand Up @@ -372,43 +396,44 @@ func (i *handler) getOrHeadHandler(w http.ResponseWriter, r *http.Request) {
return
}

var success bool

// Support custom response formats passed via ?format or Accept HTTP header
switch responseFormat {
case "", "application/json", "application/cbor":
switch mc.Code(resolvedPath.Cid().Prefix().Codec) {
case mc.Json, mc.DagJson, mc.Cbor, mc.DagCbor:
logger.Debugw("serving codec", "path", contentPath)
i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
success = i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
default:
logger.Debugw("serving unixfs", "path", contentPath)
i.serveUnixFS(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
success = i.serveUnixFS(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
}
return
case "application/vnd.ipld.raw":
logger.Debugw("serving raw block", "path", contentPath)
i.serveRawBlock(r.Context(), w, r, resolvedPath, contentPath, begin)
return
success = i.serveRawBlock(r.Context(), w, r, resolvedPath, contentPath, begin)
case "application/vnd.ipld.car":
logger.Debugw("serving car stream", "path", contentPath)
carVersion := formatParams["version"]
i.serveCAR(r.Context(), w, r, resolvedPath, contentPath, carVersion, begin)
return
success = i.serveCAR(r.Context(), w, r, resolvedPath, contentPath, carVersion, begin)
case "application/x-tar":
logger.Debugw("serving tar file", "path", contentPath)
i.serveTAR(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
return
success = i.serveTAR(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
case "application/vnd.ipld.dag-json", "application/vnd.ipld.dag-cbor":
logger.Debugw("serving codec", "path", contentPath)
i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
success = i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
case "application/vnd.ipfs.ipns-record":
logger.Debugw("serving ipns record", "path", contentPath)
i.serveIpnsRecord(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
return
success = i.serveIpnsRecord(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
default: // catch-all for unsuported application/vnd.*
err := fmt.Errorf("unsupported format %q", responseFormat)
webError(w, "failed to respond with requested content type", err, http.StatusBadRequest)
return
}

if success {
i.getMetric.WithLabelValues(contentPath.Namespace()).Observe(time.Since(begin).Seconds())
}
}

func (i *handler) addUserHeaders(w http.ResponseWriter) {
Expand Down
7 changes: 5 additions & 2 deletions gateway/handler_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (
)

// serveRawBlock returns bytes behind a raw block
func (i *handler) serveRawBlock(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, begin time.Time) {
func (i *handler) serveRawBlock(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, begin time.Time) bool {
ctx, span := spanTrace(ctx, "ServeRawBlock", trace.WithAttributes(attribute.String("path", resolvedPath.String())))
defer span.End()

blockCid := resolvedPath.Cid()
block, err := i.api.GetBlock(ctx, blockCid)
if err != nil {
webError(w, "ipfs block get "+blockCid.String(), err, http.StatusInternalServerError)
return
return false
}
content := bytes.NewReader(block.RawData())

Expand All @@ -45,4 +46,6 @@ func (i *handler) serveRawBlock(ctx context.Context, w http.ResponseWriter, r *h
// Update metrics
i.rawBlockGetMetric.WithLabelValues(contentPath.Namespace()).Observe(time.Since(begin).Seconds())
}

return dataSent
}
10 changes: 6 additions & 4 deletions gateway/handler_car.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

// serveCAR returns a CAR stream for specific DAG+selector
func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, carVersion string, begin time.Time) {
func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, carVersion string, begin time.Time) bool {
ctx, span := spanTrace(ctx, "ServeCAR", trace.WithAttributes(attribute.String("path", resolvedPath.String())))
defer span.End()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -28,7 +29,7 @@ func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.R
default:
err := fmt.Errorf("only version=1 is supported")
webError(w, "unsupported CAR version", err, http.StatusBadRequest)
return
return false
}
rootCid := resolvedPath.Cid()

Expand All @@ -55,7 +56,7 @@ func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.R
// Finish early if Etag match
if r.Header.Get("If-None-Match") == etag {
w.WriteHeader(http.StatusNotModified)
return
return false
}

// Make it clear we don't support range-requests over a car stream
Expand All @@ -79,11 +80,12 @@ func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.R
// Due to this, we suggest client always verify that
// the received CAR stream response is matching requested DAG selector
w.Header().Set("X-Stream-Error", err.Error())
return
return false
}

// Update metrics
i.carStreamGetMetric.WithLabelValues(contentPath.Namespace()).Observe(time.Since(begin).Seconds())
return true
}

// FIXME(@Jorropo): https://github.com/ipld/go-car/issues/315
Expand Down
Loading

0 comments on commit 378aaf7

Please sign in to comment.