Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/api request metrics wrapper #4516

Merged
merged 3 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion cmd/lotus-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import (
"os"

"github.com/filecoin-project/go-jsonrpc"
"go.opencensus.io/tag"

"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/metrics"

logging "github.com/ipfs/go-log"
"go.opencensus.io/stats/view"

"github.com/gorilla/mux"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -64,6 +69,13 @@ var runCmd = &cli.Command{
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
Expand All @@ -76,7 +88,7 @@ var runCmd = &cli.Command{
log.Info("Setting up API endpoint at " + address)

rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", NewGatewayAPI(api))
rpcServer.Register("Filecoin", metrics.MetricedGatewayAPI(NewGatewayAPI(api)))

mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/").Handler(http.DefaultServeMux)
Expand All @@ -89,6 +101,7 @@ var runCmd = &cli.Command{
srv := &http.Server{
Handler: mux,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-gateway"))
return ctx
},
}
Expand Down
13 changes: 12 additions & 1 deletion cmd/lotus-seal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/repo"
)

Expand Down Expand Up @@ -190,6 +193,13 @@ var runCmd = &cli.Command{
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

v, err := nodeApi.Version(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -363,7 +373,7 @@ var runCmd = &cli.Command{

readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(metrics.MetricedWorkerAPI(workerApi)))

mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
Expand All @@ -378,6 +388,7 @@ var runCmd = &cli.Command{
srv := &http.Server{
Handler: ah,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
return ctx
},
}
Expand Down
21 changes: 19 additions & 2 deletions cmd/lotus-storage-miner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"net"
"net/http"
_ "net/http/pprof"
"os"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
Expand All @@ -22,6 +25,7 @@ import (
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/ulimit"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/modules/dtypes"
Expand Down Expand Up @@ -66,6 +70,13 @@ var runCmd = &cli.Command{
defer ncloser()
ctx := lcli.DaemonContext(cctx)

// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

v, err := nodeApi.Version(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -147,7 +158,7 @@ var runCmd = &cli.Command{
mux := mux.NewRouter()

rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(minerapi))
rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(metrics.MetricedStorMinerAPI(minerapi)))

mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote)
Expand All @@ -158,7 +169,13 @@ var runCmd = &cli.Command{
Next: mux.ServeHTTP,
}

srv := &http.Server{Handler: ah}
srv := &http.Server{
Handler: ah,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-miner"))
return ctx
},
}

sigChan := make(chan os.Signal, 2)
go func() {
Expand Down
13 changes: 12 additions & 1 deletion cmd/lotus-wallet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

"github.com/filecoin-project/go-jsonrpc"

Expand All @@ -18,6 +20,7 @@ import (
ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/repo"
)

Expand Down Expand Up @@ -75,6 +78,13 @@ var runCmd = &cli.Command{
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

repoPath := cctx.String(FlagWalletRepo)
r, err := repo.NewFS(repoPath)
if err != nil {
Expand Down Expand Up @@ -125,7 +135,7 @@ var runCmd = &cli.Command{
log.Info("Setting up API endpoint at " + address)

rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", &LoggedWallet{under: w})
rpcServer.Register("Filecoin", &LoggedWallet{under: metrics.MetricedWalletAPI(w)})

mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
Expand All @@ -138,6 +148,7 @@ var runCmd = &cli.Command{
srv := &http.Server{
Handler: mux,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-wallet"))
return ctx
},
}
Expand Down
13 changes: 11 additions & 2 deletions cmd/lotus/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"net"
"net/http"
_ "net/http/pprof"
"os"
Expand All @@ -13,6 +14,7 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"contrib.go.opencensus.io/exporter/prometheus"
Expand All @@ -22,6 +24,7 @@ import (

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
)
Expand All @@ -30,7 +33,7 @@ var log = logging.Logger("main")

func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shutdownCh <-chan struct{}) error {
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(a))
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(metrics.MetricedFullAPI(a)))

ah := &auth.Handler{
Verify: a.AuthVerify,
Expand Down Expand Up @@ -60,7 +63,13 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shut
return xerrors.Errorf("could not listen: %w", err)
}

srv := &http.Server{Handler: http.DefaultServeMux}
srv := &http.Server{
Handler: http.DefaultServeMux,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-daemon"))
return ctx
},
}

sigCh := make(chan os.Signal, 2)
shutdownDone := make(chan struct{})
Expand Down
19 changes: 19 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"time"

"go.opencensus.io/stats"
Expand All @@ -24,6 +25,8 @@ var (
MessageTo, _ = tag.NewKey("message_to")
MessageNonce, _ = tag.NewKey("message_nonce")
ReceivedFrom, _ = tag.NewKey("received_from")
Endpoint, _ = tag.NewKey("endpoint")
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls
)

// Measures
Expand All @@ -49,6 +52,7 @@ var (
PubsubRecvRPC = stats.Int64("pubsub/recv_rpc", "Counter for total received RPCs", stats.UnitDimensionless)
PubsubSendRPC = stats.Int64("pubsub/send_rpc", "Counter for total sent RPCs", stats.UnitDimensionless)
PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless)
APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds)
)

var (
Expand Down Expand Up @@ -137,6 +141,11 @@ var (
Measure: PubsubDropRPC,
Aggregation: view.Count(),
}
APIRequestDurationView = &view.View{
Measure: APIRequestDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{APIInterface, Endpoint},
}
)

// DefaultViews is an array of OpenCensus views for metric gathering purposes
Expand All @@ -161,10 +170,20 @@ var DefaultViews = append([]*view.View{
PubsubRecvRPCView,
PubsubSendRPCView,
PubsubDropRPCView,
APIRequestDurationView,
},
rpcmetrics.DefaultViews...)

// SinceInMilliseconds returns the duration of time since the provide time as a float64.
func SinceInMilliseconds(startTime time.Time) float64 {
return float64(time.Since(startTime).Nanoseconds()) / 1e6
}

// Timer is a function stopwatch, calling it starts the timer,
// calling the returned function will record the duration.
func Timer(ctx context.Context, m *stats.Float64Measure) func() {
start := time.Now()
return func() {
stats.Record(ctx, m.M(SinceInMilliseconds(start)))
}
}
65 changes: 65 additions & 0 deletions metrics/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package metrics

import (
"context"
"reflect"

"go.opencensus.io/tag"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
)

func MetricedStorMinerAPI(a api.StorageMiner) api.StorageMiner {
var out apistruct.StorageMinerStruct
proxy(a, &out.Internal)
proxy(a, &out.CommonStruct.Internal)
return &out
}

func MetricedFullAPI(a api.FullNode) api.FullNode {
var out apistruct.FullNodeStruct
proxy(a, &out.Internal)
proxy(a, &out.CommonStruct.Internal)
return &out
}

func MetricedWorkerAPI(a api.WorkerAPI) api.WorkerAPI {
var out apistruct.WorkerStruct
proxy(a, &out.Internal)
return &out
}

func MetricedWalletAPI(a api.WalletAPI) api.WalletAPI {
var out apistruct.WalletStruct
proxy(a, &out.Internal)
return &out
}

func MetricedGatewayAPI(a api.GatewayAPI) api.GatewayAPI {
var out apistruct.GatewayStruct
proxy(a, &out.Internal)
return &out
}

func proxy(in interface{}, out interface{}) {
rint := reflect.ValueOf(out).Elem()
ra := reflect.ValueOf(in)

for f := 0; f < rint.NumField(); f++ {
field := rint.Type().Field(f)
fn := ra.MethodByName(field.Name)

rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
ctx := args[0].Interface().(context.Context)
// upsert function name into context
ctx, _ = tag.New(ctx, tag.Upsert(Endpoint, field.Name))
stop := Timer(ctx, APIRequestDuration)
defer stop()
// pass tagged ctx back into function call
args[0] = reflect.ValueOf(ctx)
return fn.Call(args)
}))

}
}