diff --git a/api/blocklistener.go b/api/blocklistener.go index 4200e7225b..df0ae003a2 100644 --- a/api/blocklistener.go +++ b/api/blocklistener.go @@ -13,7 +13,7 @@ import ( "github.com/iotexproject/iotex-core/pkg/log" ) -type streamHandler func(interface{}) error +type streamHandler func(interface{}) (int, error) type gRPCBlockListener struct { streamHandle streamHandler @@ -44,7 +44,7 @@ func (bl *gRPCBlockListener) Respond(_ string, blk *block.Block) error { Height: blk.Height(), } // send blockInfo thru streaming API - if err := bl.streamHandle(&iotexapi.StreamBlocksResponse{ + if _, err := bl.streamHandle(&iotexapi.StreamBlocksResponse{ Block: blockInfo, BlockIdentifier: blockID, }); err != nil { @@ -84,7 +84,7 @@ func (bl *web3BlockListener) Respond(id string, blk *block.Block) error { }, } // send blockInfo thru streaming API - if err := bl.streamHandle(res); err != nil { + if _, err := bl.streamHandle(res); err != nil { log.L().Info( "Error when streaming the block", zap.Uint64("height", blk.Height()), diff --git a/api/blocklistener_test.go b/api/blocklistener_test.go index 7a34862e3a..94c27eccd9 100644 --- a/api/blocklistener_test.go +++ b/api/blocklistener_test.go @@ -31,8 +31,8 @@ func TestBlockListener(t *testing.T) { server := mock_apiserver.NewMockStreamBlocksServer(ctrl) responder := NewGRPCBlockListener( - func(in interface{}) error { - return server.Send(in.(*iotexapi.StreamBlocksResponse)) + func(in interface{}) (int, error) { + return 0, server.Send(in.(*iotexapi.StreamBlocksResponse)) }, errChan) diff --git a/api/coreservice.go b/api/coreservice.go index 60a8bb076d..32951b9d42 100644 --- a/api/coreservice.go +++ b/api/coreservice.go @@ -56,6 +56,7 @@ import ( batch "github.com/iotexproject/iotex-core/pkg/messagebatcher" "github.com/iotexproject/iotex-core/pkg/tracer" "github.com/iotexproject/iotex-core/pkg/version" + "github.com/iotexproject/iotex-core/server/itx/nodestats" "github.com/iotexproject/iotex-core/state" "github.com/iotexproject/iotex-core/state/factory" ) @@ -156,6 +157,9 @@ type ( gasLimit uint64, data []byte, config *logger.Config) ([]byte, *action.Receipt, *logger.StructLogger, error) + + // Track tracks the api call + Track(ctx context.Context, start time.Time, method string, size int64, success bool) } // coreService implements the CoreService interface @@ -175,6 +179,7 @@ type ( electionCommittee committee.Committee readCache *ReadCache messageBatcher *batch.Manager + apiStats *nodestats.APILocalStats } // jobDesc provides a struct to get and store logs in core.LogsInRange @@ -204,6 +209,13 @@ func WithNativeElection(committee committee.Committee) Option { } } +// WithAPIStats is the option to return RPC stats through API. +func WithAPIStats(stats *nodestats.APILocalStats) Option { + return func(svr *coreService) { + svr.apiStats = stats + } +} + type intrinsicGasCalculator interface { IntrinsicGas() (uint64, error) } @@ -1706,3 +1718,16 @@ func (core *coreService) TraceCall(ctx context.Context, retval, receipt, err := core.sf.SimulateExecution(ctx, callerAddr, exec, getblockHash) return retval, receipt, traces, err } + +// Track tracks the api call +func (core *coreService) Track(ctx context.Context, start time.Time, method string, size int64, success bool) { + if core.apiStats == nil { + return + } + elapsed := time.Since(start) + core.apiStats.ReportCall(nodestats.APIReport{ + Method: method, + HandlingTime: elapsed, + Success: success, + }, size) +} diff --git a/api/grpcserver.go b/api/grpcserver.go index c23a700fc2..51df6128d3 100644 --- a/api/grpcserver.go +++ b/api/grpcserver.go @@ -546,8 +546,8 @@ func (svr *gRPCHandler) StreamBlocks(_ *iotexapi.StreamBlocksRequest, stream iot defer close(errChan) chainListener := svr.coreService.ChainListener() if _, err := chainListener.AddResponder(NewGRPCBlockListener( - func(resp interface{}) error { - return stream.Send(resp.(*iotexapi.StreamBlocksResponse)) + func(resp interface{}) (int, error) { + return 0, stream.Send(resp.(*iotexapi.StreamBlocksResponse)) }, errChan, )); err != nil { @@ -570,8 +570,8 @@ func (svr *gRPCHandler) StreamLogs(in *iotexapi.StreamLogsRequest, stream iotexa chainListener := svr.coreService.ChainListener() if _, err := chainListener.AddResponder(NewGRPCLogListener( logfilter.NewLogFilter(in.GetFilter()), - func(in interface{}) error { - return stream.Send(in.(*iotexapi.StreamLogsResponse)) + func(in interface{}) (int, error) { + return 0, stream.Send(in.(*iotexapi.StreamLogsResponse)) }, errChan, )); err != nil { diff --git a/api/http.go b/api/http.go index 05c8b9e277..51276eafb8 100644 --- a/api/http.go +++ b/api/http.go @@ -73,10 +73,14 @@ func (handler *hTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) if err := handler.msgHandler.HandlePOSTReq(ctx, req.Body, apitypes.NewResponseWriter( - func(resp interface{}) error { + func(resp interface{}) (int, error) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Content-Type", "application/json; charset=UTF-8") - return json.NewEncoder(w).Encode(resp) + raw, err := json.Marshal(resp) + if err != nil { + return 0, err + } + return w.Write(raw) }), ); err != nil { log.Logger("api").Warn("fail to respond request.", zap.Error(err)) diff --git a/api/loglistener.go b/api/loglistener.go index 05905e119f..59a21812e4 100644 --- a/api/loglistener.go +++ b/api/loglistener.go @@ -36,7 +36,7 @@ func (ll *gRPCLogListener) Respond(_ string, blk *block.Block) error { for _, e := range logs { logPb := e.ConvertToLogPb() logPb.BlkHash = blkHash[:] - if err := ll.streamHandle(&iotexapi.StreamLogsResponse{Log: logPb}); err != nil { + if _, err := ll.streamHandle(&iotexapi.StreamLogsResponse{Log: logPb}); err != nil { ll.errChan <- err log.L().Info("error streaming the log", zap.Uint64("height", e.BlockHeight), @@ -81,7 +81,7 @@ func (ll *web3LogListener) Respond(id string, blk *block.Block) error { log: e, }, } - if err := ll.streamHandle(res); err != nil { + if _, err := ll.streamHandle(res); err != nil { log.L().Info( "Error when streaming the block", zap.Uint64("height", blk.Height()), diff --git a/api/types/types.go b/api/types/types.go index 9f10402b17..d8cf40c7d4 100644 --- a/api/types/types.go +++ b/api/types/types.go @@ -14,7 +14,7 @@ var MaxResponseSize = 1024 * 1024 * 100 // 100MB type ( // Web3ResponseWriter is writer for web3 request Web3ResponseWriter interface { - Write(interface{}) error + Write(interface{}) (int, error) } // Responder responds to new block @@ -41,15 +41,15 @@ type ( // responseWriter for server type responseWriter struct { - writeHandler func(interface{}) error + writeHandler func(interface{}) (int, error) } // NewResponseWriter returns a new responseWriter -func NewResponseWriter(handler func(interface{}) error) Web3ResponseWriter { +func NewResponseWriter(handler func(interface{}) (int, error)) Web3ResponseWriter { return &responseWriter{handler} } -func (w *responseWriter) Write(in interface{}) error { +func (w *responseWriter) Write(in interface{}) (int, error) { return w.writeHandler(in) } @@ -69,20 +69,21 @@ func NewBatchWriter(singleWriter Web3ResponseWriter) *BatchWriter { } // Write adds data into batch buffer -func (w *BatchWriter) Write(in interface{}) error { +func (w *BatchWriter) Write(in interface{}) (int, error) { raw, err := json.Marshal(in) if err != nil { - return err + return 0, err } w.totalSize += len(raw) if w.totalSize > MaxResponseSize { - return errors.New("response size exceeds limit") + return w.totalSize, errors.New("response size exceeds limit") } w.buf = append(w.buf, raw) - return nil + return w.totalSize, nil } // Flush writes data in batch buffer func (w *BatchWriter) Flush() error { - return w.writer.Write(w.buf) + _, err := w.writer.Write(w.buf) + return err } diff --git a/api/web3server.go b/api/web3server.go index 65ca4643fd..385cd951b7 100644 --- a/api/web3server.go +++ b/api/web3server.go @@ -102,7 +102,8 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri if err != nil { err := errors.Wrap(err, "failed to parse web3 requests.") span.RecordError(err) - return writer.Write(&web3Response{err: err}) + _, err = writer.Write(&web3Response{err: err}) + return err } if !web3Reqs.IsArray() { return svr.handleWeb3Req(ctx, &web3Reqs, writer) @@ -116,7 +117,8 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri svr.batchRequestLimit, ) span.RecordError(err) - return writer.Write(&web3Response{err: err}) + _, err = writer.Write(&web3Response{err: err}) + return err } batchWriter := apitypes.NewBatchWriter(writer) for i := range web3ReqArr { @@ -129,10 +131,12 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri func (svr *web3Handler) handleWeb3Req(ctx context.Context, web3Req *gjson.Result, writer apitypes.Web3ResponseWriter) error { var ( - res interface{} - err error - method = web3Req.Get("method").Value() + res interface{} + err, err1 error + method = web3Req.Get("method").Value() + size int ) + defer func(start time.Time) { svr.coreService.Track(ctx, start, method.(string), int64(size), err == nil) }(time.Now()) span := tracer.SpanFromContext(ctx) defer span.End() span.AddEvent("handleWeb3Req") @@ -239,11 +243,12 @@ func (svr *web3Handler) handleWeb3Req(ctx context.Context, web3Req *gjson.Result } else { log.Logger("api").Debug("web3Debug", zap.String("response", fmt.Sprintf("%+v", res))) } - return writer.Write(&web3Response{ + size, err1 = writer.Write(&web3Response{ id: int(web3Req.Get("id").Int()), result: res, err: err, }) + return err1 } func parseWeb3Reqs(reader io.Reader) (gjson.Result, error) { diff --git a/api/web3server_test.go b/api/web3server_test.go index 4bfdccdee2..e2dc66efbd 100644 --- a/api/web3server_test.go +++ b/api/web3server_test.go @@ -89,6 +89,7 @@ func TestHandlePost(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() core := mock_apicoreservice.NewMockCoreService(ctrl) + core.EXPECT().Track(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return().AnyTimes() svr := newHTTPHandler(NewWeb3Handler(core, "", _defaultBatchRequestLimit)) getServerResp := func(svr *hTTPHandler, req *http.Request) *httptest.ResponseRecorder { req.Header.Set("Content-Type", "application/json") diff --git a/api/websocket.go b/api/websocket.go index 14422f08ed..9d5ee08d15 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -86,11 +86,11 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock err = wsSvr.msgHandler.HandlePOSTReq(ctx, reader, apitypes.NewResponseWriter( - func(resp interface{}) error { + func(resp interface{}) (int, error) { if err = ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { log.Logger("api").Warn("failed to set write deadline timeout.", zap.Error(err)) } - return ws.WriteJSON(resp) + return 0, ws.WriteJSON(resp) }), ) if err != nil { diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index d969dcfc55..f4e3f1f845 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -23,6 +23,7 @@ import ( "github.com/iotexproject/iotex-core/pkg/lifecycle" "github.com/iotexproject/iotex-core/pkg/log" "github.com/iotexproject/iotex-core/pkg/routine" + "github.com/iotexproject/iotex-core/server/itx/nodestats" ) type ( @@ -42,7 +43,7 @@ type ( // BlockSync defines the interface of blocksyncer BlockSync interface { lifecycle.StartStopper - + nodestats.StatsReporter // TargetHeight returns the target height to sync to TargetHeight() uint64 // ProcessSyncRequest processes a block sync request @@ -122,6 +123,10 @@ func (*dummyBlockSync) SyncStatus() (uint64, uint64, uint64, string) { return 0, 0, 0, "" } +func (*dummyBlockSync) BuildReport() string { + return "" +} + // NewBlockSyncer returns a new block syncer instance func NewBlockSyncer( cfg Config, @@ -335,3 +340,15 @@ func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) { } return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc } + +// BuildReport builds a report of block syncer +func (bs *blockSyncer) BuildReport() string { + startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus() + return fmt.Sprintf( + "BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s", + startingHeight, + tipHeight, + targetHeight, + syncSpeedDesc, + ) +} diff --git a/chainservice/builder.go b/chainservice/builder.go index cd9dd6b535..c397f5d4b9 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -40,6 +40,7 @@ import ( "github.com/iotexproject/iotex-core/nodeinfo" "github.com/iotexproject/iotex-core/p2p" "github.com/iotexproject/iotex-core/pkg/log" + "github.com/iotexproject/iotex-core/server/itx/nodestats" "github.com/iotexproject/iotex-core/state/factory" ) @@ -92,6 +93,13 @@ func (builder *Builder) SetP2PAgent(agent p2p.Agent) *Builder { return builder } +// SetRPCStats sets the RPCStats instance +func (builder *Builder) SetRPCStats(stats *nodestats.APILocalStats) *Builder { + builder.createInstance() + builder.cs.apiStats = stats + return builder +} + // SetElectionCommittee sets the election committee instance func (builder *Builder) SetElectionCommittee(c committee.Committee) *Builder { builder.createInstance() diff --git a/chainservice/chainservice.go b/chainservice/chainservice.go index 63a68ffd52..2231943a8f 100644 --- a/chainservice/chainservice.go +++ b/chainservice/chainservice.go @@ -36,6 +36,7 @@ import ( "github.com/iotexproject/iotex-core/p2p" "github.com/iotexproject/iotex-core/pkg/lifecycle" "github.com/iotexproject/iotex-core/pkg/log" + "github.com/iotexproject/iotex-core/server/itx/nodestats" "github.com/iotexproject/iotex-core/state/factory" ) @@ -89,6 +90,7 @@ type ChainService struct { contractStakingIndexer contractstaking.ContractIndexer registry *protocol.Registry nodeInfoManager *nodeinfo.InfoManager + apiStats *nodestats.APILocalStats } // Start starts the server @@ -231,6 +233,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, plugins map[int]interface{} return p2pAgent.BroadcastOutbound(ctx, msg) }), api.WithNativeElection(cs.electionCommittee), + api.WithAPIStats(cs.apiStats), } svr, err := api.NewServerV2( diff --git a/go.mod b/go.mod index bcd068d4ca..3ee35fb4b5 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/cespare/xxhash/v2 v2.1.2 github.com/holiman/uint256 v1.2.0 + github.com/mackerelio/go-osstat v0.2.4 github.com/prometheus/client_model v0.2.0 github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible github.com/shirou/gopsutil/v3 v3.22.2 @@ -193,7 +194,7 @@ require ( go.opentelemetry.io/otel/metric v0.31.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/sys v0.5.0 // indirect + golang.org/x/sys v0.6.0 // indirect golang.org/x/term v0.5.0 // indirect golang.org/x/time v0.1.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect diff --git a/go.sum b/go.sum index 43754af9c4..8e3f33ac8d 100644 --- a/go.sum +++ b/go.sum @@ -845,6 +845,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= +github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= +github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/magefile/mage v1.9.0 h1:t3AU2wNwehMCW97vuqQLtw6puppWXHO+O2MHo5a50XE= github.com/magefile/mage v1.9.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -1550,8 +1552,8 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= diff --git a/p2p/agent.go b/p2p/agent.go index 2b60d48f2e..8604abc4ff 100644 --- a/p2p/agent.go +++ b/p2p/agent.go @@ -30,6 +30,7 @@ import ( "github.com/iotexproject/iotex-core/pkg/log" "github.com/iotexproject/iotex-core/pkg/routine" "github.com/iotexproject/iotex-core/pkg/tracer" + "github.com/iotexproject/iotex-core/server/itx/nodestats" ) const ( @@ -98,6 +99,7 @@ type ( // Agent is the agent to help the blockchain node connect into the P2P networks and send/receive messages Agent interface { lifecycle.StartStopper + nodestats.StatsReporter // BroadcastOutbound sends a broadcast message to the whole network BroadcastOutbound(ctx context.Context, msg proto.Message) (err error) // UnicastOutbound sends a unicast message to the given address @@ -180,6 +182,10 @@ func (*dummyAgent) BlockPeer(string) { return } +func (*dummyAgent) BuildReport() string { + return "" +} + // NewAgent instantiates a local P2P agent instance func NewAgent(cfg Config, chainID uint32, genesisHash hash.Hash256, broadcastHandler HandleBroadcastInbound, unicastHandler HandleUnicastInboundAsync) Agent { log.L().Info("p2p agent", log.Hex("topicSuffix", genesisHash[22:])) @@ -490,6 +496,15 @@ func (p *agent) BlockPeer(pidStr string) { p.host.BlockPeer(pid) } +// BuildReport builds a report of p2p agent +func (p *agent) BuildReport() string { + neighbors, err := p.ConnectedPeers() + if err == nil { + return fmt.Sprintf("P2P ConnectedPeers: %d", len(neighbors)) + } + return "" +} + func (p *agent) connectBootNode(ctx context.Context) error { if len(p.cfg.BootstrapNodes) == 0 { return nil diff --git a/server/itx/nodestats/apilocalstats.go b/server/itx/nodestats/apilocalstats.go new file mode 100644 index 0000000000..58a58392ff --- /dev/null +++ b/server/itx/nodestats/apilocalstats.go @@ -0,0 +1,194 @@ +package nodestats + +import ( + "fmt" + "strings" + "sync" + "time" +) + +// APIReport is the report of an API call +type APIReport struct { + Method string + HandlingTime time.Duration + Success bool +} + +type apiMethodStats struct { + Successes int + Errors int + AvgTimeOfErrors int64 + AvgTimeOfSuccesses int64 + MaxTimeOfError int64 + MaxTimeOfSuccess int64 + TotalSize int64 +} + +// AvgSize returns the average size of the api call +func (m *apiMethodStats) AvgSize() int64 { + if m.Successes+m.Errors == 0 { + return 0 + } + return m.TotalSize / int64(m.Successes+m.Errors) +} + +var _ StatsReporter = (*APILocalStats)(nil) + +// APILocalStats is the struct for getting API stats +type APILocalStats struct { + allTimeStats sync.Map + currentStats sync.Map +} + +// NewAPILocalStats creates a new APILocalStats +func NewAPILocalStats() *APILocalStats { + return &APILocalStats{ + allTimeStats: sync.Map{}, + currentStats: sync.Map{}, + } +} + +// ReportCall reports a call to the API +func (s *APILocalStats) ReportCall(report APIReport, size int64) { + if report.Method == "" { + return + } + v, _ := s.currentStats.LoadOrStore(report.Method, &apiMethodStats{}) + methodStats := v.(*apiMethodStats) + v, _ = s.allTimeStats.LoadOrStore(report.Method, &apiMethodStats{}) + allTimeMethodStats := v.(*apiMethodStats) + reportHandlingTimeMicroseconds := report.HandlingTime.Microseconds() + if report.Success { + methodStats.Successes++ + methodStats.AvgTimeOfSuccesses = (methodStats.AvgTimeOfSuccesses*int64(methodStats.Successes-1) + reportHandlingTimeMicroseconds) / int64(methodStats.Successes) + if reportHandlingTimeMicroseconds > methodStats.MaxTimeOfSuccess { + methodStats.MaxTimeOfSuccess = reportHandlingTimeMicroseconds + } + + allTimeMethodStats.Successes++ + allTimeMethodStats.AvgTimeOfSuccesses = (allTimeMethodStats.AvgTimeOfSuccesses*int64(allTimeMethodStats.Successes-1) + reportHandlingTimeMicroseconds) / int64(allTimeMethodStats.Successes) + if reportHandlingTimeMicroseconds > allTimeMethodStats.MaxTimeOfSuccess { + allTimeMethodStats.MaxTimeOfSuccess = reportHandlingTimeMicroseconds + } + } else { + methodStats.Errors++ + methodStats.AvgTimeOfErrors = (methodStats.AvgTimeOfErrors*int64(methodStats.Errors-1) + reportHandlingTimeMicroseconds) / int64(methodStats.Errors) + if reportHandlingTimeMicroseconds > methodStats.MaxTimeOfError { + methodStats.MaxTimeOfError = reportHandlingTimeMicroseconds + } + + allTimeMethodStats.Errors++ + allTimeMethodStats.AvgTimeOfErrors = (allTimeMethodStats.AvgTimeOfErrors*int64(allTimeMethodStats.Errors-1) + reportHandlingTimeMicroseconds) / int64(allTimeMethodStats.Errors) + if reportHandlingTimeMicroseconds > allTimeMethodStats.MaxTimeOfError { + allTimeMethodStats.MaxTimeOfError = reportHandlingTimeMicroseconds + } + } + methodStats.TotalSize += size + allTimeMethodStats.TotalSize += size + + s.currentStats.Store(report.Method, methodStats) + s.allTimeStats.Store(report.Method, allTimeMethodStats) +} + +// BuildReport builds a report of the API stats +func (s *APILocalStats) BuildReport() string { + var snapshot sync.Map + snapshotLen := 0 + s.currentStats.Range(func(key, value interface{}) bool { + snapshot.Store(key, value) + snapshotLen++ + s.currentStats.Delete(key) + return true + }) + stringBuilder := strings.Builder{} + + if snapshotLen == 0 { + return stringBuilder.String() + } + const reportHeader = "method | " + + "successes | " + + " avg time (µs) | " + + " max time (µs) | " + + " errors | " + + " avg time (µs) | " + + " max time (µs) |" + + " avg size |" + + " total size |" + stringBuilder.WriteString("***** API CALL report *****\n") + divider := strings.Repeat("-", len(reportHeader)-4) + stringBuilder.WriteString(divider + "\n") + stringBuilder.WriteString(reportHeader + "\n") + stringBuilder.WriteString(divider + "\n") + total := &apiMethodStats{} + snapshot.Range(func(key, val interface{}) bool { + value := val.(*apiMethodStats) + if total.Successes+value.Successes > 0 { + total.AvgTimeOfSuccesses = (total.AvgTimeOfSuccesses*int64(total.Successes) + int64(value.Successes)*value.AvgTimeOfSuccesses) / int64(total.Successes+value.Successes) + } else { + total.AvgTimeOfSuccesses = 0 + } + if total.Errors+value.Errors > 0 { + total.AvgTimeOfErrors = (total.AvgTimeOfErrors*int64(total.Errors) + int64(value.Errors)*value.AvgTimeOfErrors) / int64(total.Errors+value.Errors) + } else { + total.AvgTimeOfErrors = 0 + } + total.Successes += value.Successes + total.Errors += value.Errors + if value.MaxTimeOfError > total.MaxTimeOfError { + total.MaxTimeOfError = value.MaxTimeOfError + } + if value.MaxTimeOfSuccess > total.MaxTimeOfSuccess { + total.MaxTimeOfSuccess = value.MaxTimeOfSuccess + } + total.TotalSize += value.TotalSize + stringBuilder.WriteString(s.prepareReportLine(key.(string), value) + "\n") + return true + }) + stringBuilder.WriteString(divider + "\n") + stringBuilder.WriteString(s.prepareReportLine("TOTAL", total) + "\n") + stringBuilder.WriteString(divider + "\n") + return stringBuilder.String() + +} + +func (s *APILocalStats) prepareReportLine(method string, stats *apiMethodStats) string { + return fmt.Sprintf("%-40s| %9d | %14d | %14d | %9d | %14d | %14d | %8s | %10s |", + method, + stats.Successes, + stats.AvgTimeOfSuccesses, + stats.MaxTimeOfSuccess, + stats.Errors, + stats.AvgTimeOfErrors, + stats.MaxTimeOfError, + byteCountSI(stats.AvgSize()), + byteCountSI(stats.TotalSize), + ) +} + +func byteCountSI(b int64) string { + const unit = 1000 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %cB", + float64(b)/float64(div), "kMGTPE"[exp]) +} + +func byteCountIEC(b uint64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %ciB", + float64(b)/float64(div), "KMGTPE"[exp]) +} diff --git a/server/itx/nodestats/apilocalstats_test.go b/server/itx/nodestats/apilocalstats_test.go new file mode 100644 index 0000000000..fbfb01bce5 --- /dev/null +++ b/server/itx/nodestats/apilocalstats_test.go @@ -0,0 +1,31 @@ +package nodestats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRPCLocalStats(t *testing.T) { + require := require.New(t) + stats := NewAPILocalStats() + stats.ReportCall(APIReport{Method: "test", Success: true, HandlingTime: time.Duration(100 * time.Microsecond)}, 1) + stats.ReportCall(APIReport{Method: "test", Success: true, HandlingTime: time.Duration(200 * time.Microsecond)}, 2) + stats.ReportCall(APIReport{Method: "test", Success: true, HandlingTime: time.Duration(300 * time.Microsecond)}, 3) + stats.ReportCall(APIReport{Method: "test", Success: false, HandlingTime: time.Duration(400 * time.Microsecond)}, 4) + v, ok := stats.allTimeStats.Load("test") + require.True(ok) + val := v.(*apiMethodStats) + require.Equal(3, val.Successes) + require.Equal(int64(200), val.AvgTimeOfSuccesses) + require.Equal(int64(300), val.MaxTimeOfSuccess) + require.Equal(int64(400), val.MaxTimeOfError) + require.Equal(int64(400), val.AvgTimeOfErrors) + require.Equal(1, val.Errors) + require.Equal(int64(10), val.TotalSize) + require.Equal(int64(2), val.AvgSize()) + stats.ReportCall(APIReport{Method: "test2", Success: false, HandlingTime: time.Duration(400 * time.Microsecond)}, 400000) + report := stats.BuildReport() + t.Log(report) +} diff --git a/server/itx/nodestats/nodestats.go b/server/itx/nodestats/nodestats.go new file mode 100644 index 0000000000..77dee15c99 --- /dev/null +++ b/server/itx/nodestats/nodestats.go @@ -0,0 +1,53 @@ +package nodestats + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/iotexproject/iotex-core/pkg/routine" +) + +const ( + // PeriodicReportInterval is the interval for generating periodic reports + PeriodicReportInterval = 5 * time.Minute +) + +// StatsReporter is the interface for stats reporter +type StatsReporter interface { + BuildReport() string +} + +// NodeStats is the struct for getting node stats +type NodeStats struct { + list []StatsReporter + task *routine.RecurringTask +} + +// NewNodeStats creates a new NodeStats +func NewNodeStats(stats ...StatsReporter) *NodeStats { + return &NodeStats{ + list: append(stats, newSystemStats()), + } +} + +// Start starts the node stats +func (s *NodeStats) Start(ctx context.Context) error { + s.task = routine.NewRecurringTask(s.generateReport, PeriodicReportInterval) + return s.task.Start(ctx) +} + +// Stop stops the node stats +func (s *NodeStats) Stop(ctx context.Context) error { + return s.task.Stop(ctx) +} + +func (s *NodeStats) generateReport() { + stringBuilder := strings.Builder{} + for _, stat := range s.list { + stringBuilder.WriteString(stat.BuildReport()) + stringBuilder.WriteString("\n") + } + fmt.Println(stringBuilder.String()) +} diff --git a/server/itx/nodestats/systemstats.go b/server/itx/nodestats/systemstats.go new file mode 100644 index 0000000000..30d66cc5a9 --- /dev/null +++ b/server/itx/nodestats/systemstats.go @@ -0,0 +1,105 @@ +package nodestats + +import ( + "runtime" + "strconv" + "strings" + "syscall" + + "github.com/mackerelio/go-osstat/cpu" + "github.com/mackerelio/go-osstat/memory" +) + +// DiskStatus is the status of the disk +type DiskStatus struct { + All uint64 `json:"All"` + Used uint64 `json:"Used"` + Free uint64 `json:"Free"` +} + +// diskusage of path/disk +func diskUsage(path string) (disk DiskStatus) { + fs := syscall.Statfs_t{} + err := syscall.Statfs(path, &fs) + if err != nil { + return + } + disk.All = fs.Blocks * uint64(fs.Bsize) + disk.Free = fs.Bfree * uint64(fs.Bsize) + disk.Used = disk.All - disk.Free + return +} + +var _ StatsReporter = (*systemStats)(nil) + +type systemStats struct { + cpuValue *cpu.Stats +} + +func newSystemStats() *systemStats { + cpuValue, _ := cpu.Get() + return &systemStats{ + cpuValue: cpuValue, + } +} + +// GetMemory returns the memory stats +func (s *systemStats) GetMemory() (*memory.Stats, error) { return memory.Get() } + +// GetCPU returns the cpu stats +func (s *systemStats) GetCPU() (*cpu.Stats, error) { return cpu.Get() } + +// GetDisk returns the disk stats +func (s *systemStats) GetDisk() DiskStatus { return diskUsage("/") } + +// BuildReport builds the report +func (s *systemStats) BuildReport() string { + stringBuilder := strings.Builder{} + cpuValue, _ := s.GetCPU() + total := float64(cpuValue.Total - s.cpuValue.Total) + user := float64(cpuValue.User - s.cpuValue.User) + system := float64(cpuValue.System - s.cpuValue.System) + idle := float64(cpuValue.Idle - s.cpuValue.Idle) + + s.cpuValue = cpuValue + stringBuilder.WriteString("CPU: ") + stringBuilder.WriteString(strconv.Itoa(runtime.NumCPU())) + stringBuilder.WriteString("CPUs ") + stringBuilder.WriteString("User: ") + stringBuilder.WriteString(strconv.FormatFloat(user/total*100, 'f', 2, 64)) + stringBuilder.WriteString("% ") + stringBuilder.WriteString("System: ") + stringBuilder.WriteString(strconv.FormatFloat(system/total*100, 'f', 2, 64)) + stringBuilder.WriteString("% ") + stringBuilder.WriteString("Idle: ") + stringBuilder.WriteString(strconv.FormatFloat(idle/total*100, 'f', 2, 64)) + stringBuilder.WriteString("% ") + + stringBuilder.WriteString("\n") + memValue, _ := s.GetMemory() + stringBuilder.WriteString("Memory: ") + stringBuilder.WriteString("Total: ") + stringBuilder.WriteString(byteCountIEC(memValue.Total)) + stringBuilder.WriteString(" ") + stringBuilder.WriteString("Free: ") + stringBuilder.WriteString(byteCountIEC(memValue.Free)) + stringBuilder.WriteString(" ") + stringBuilder.WriteString("Used: ") + stringBuilder.WriteString(byteCountIEC(memValue.Used)) + stringBuilder.WriteString(" ") + stringBuilder.WriteString("Cached: ") + stringBuilder.WriteString(byteCountIEC(memValue.Cached)) + + stringBuilder.WriteString("\n") + diskValue := s.GetDisk() + stringBuilder.WriteString("Disk: ") + stringBuilder.WriteString("Total: ") + stringBuilder.WriteString(byteCountIEC(diskValue.All)) + stringBuilder.WriteString(" ") + stringBuilder.WriteString("Free: ") + stringBuilder.WriteString(byteCountIEC(diskValue.Free)) + stringBuilder.WriteString(" ") + stringBuilder.WriteString("Used: ") + stringBuilder.WriteString(byteCountIEC(diskValue.Used)) + return stringBuilder.String() +} diff --git a/server/itx/server.go b/server/itx/server.go index 811d784e21..d3fa5c7bb2 100644 --- a/server/itx/server.go +++ b/server/itx/server.go @@ -26,6 +26,7 @@ import ( "github.com/iotexproject/iotex-core/pkg/probe" "github.com/iotexproject/iotex-core/pkg/routine" "github.com/iotexproject/iotex-core/pkg/util/httputil" + "github.com/iotexproject/iotex-core/server/itx/nodestats" ) // Server is the iotex server instance containing all components. @@ -36,6 +37,7 @@ type Server struct { apiServers map[uint32]*api.ServerV2 p2pAgent p2p.Agent dispatcher dispatcher.Dispatcher + nodeStats *nodestats.NodeStats initializedSubChains map[uint32]bool mutex sync.RWMutex subModuleCancel context.CancelFunc @@ -70,6 +72,8 @@ func newServer(cfg config.Config, testing bool) (*Server, error) { var cs *chainservice.ChainService builder := chainservice.NewBuilder(cfg) builder.SetP2PAgent(p2pAgent) + rpcStats := nodestats.NewAPILocalStats() + builder.SetRPCStats(rpcStats) if testing { cs, err = builder.BuildForTest() } else { @@ -78,6 +82,7 @@ func newServer(cfg config.Config, testing bool) (*Server, error) { if err != nil { return nil, errors.Wrap(err, "fail to create chain service") } + nodeStats := nodestats.NewNodeStats(rpcStats, cs.BlockSync(), p2pAgent) apiServer, err := cs.NewAPIServer(cfg.API, cfg.Plugins) if err != nil { return nil, errors.Wrap(err, "failed to create api server") @@ -98,6 +103,7 @@ func newServer(cfg config.Config, testing bool) (*Server, error) { rootChainService: cs, chainservices: chains, apiServers: apiServers, + nodeStats: nodeStats, initializedSubChains: map[uint32]bool{}, } // Setup sub-chain starter @@ -125,12 +131,18 @@ func (s *Server) Start(ctx context.Context) error { if err := s.dispatcher.Start(cctx); err != nil { return errors.Wrap(err, "error when starting dispatcher") } + if err := s.nodeStats.Start(cctx); err != nil { + return errors.Wrap(err, "error when starting node stats") + } return nil } // Stop stops the server func (s *Server) Stop(ctx context.Context) error { defer s.subModuleCancel() + if err := s.nodeStats.Stop(ctx); err != nil { + return errors.Wrap(err, "error when stopping node stats") + } if err := s.p2pAgent.Stop(ctx); err != nil { // notest return errors.Wrap(err, "error when stopping P2P agent") diff --git a/test/mock/mock_apicoreservice/mock_apicoreservice.go b/test/mock/mock_apicoreservice/mock_apicoreservice.go index d2f177d9ea..bcec385938 100644 --- a/test/mock/mock_apicoreservice/mock_apicoreservice.go +++ b/test/mock/mock_apicoreservice/mock_apicoreservice.go @@ -8,6 +8,7 @@ import ( context "context" big "math/big" reflect "reflect" + time "time" logger "github.com/ethereum/go-ethereum/eth/tracers/logger" gomock "github.com/golang/mock/gomock" @@ -625,6 +626,18 @@ func (mr *MockCoreServiceMockRecorder) TraceTransaction(ctx, actHash, config int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TraceTransaction", reflect.TypeOf((*MockCoreService)(nil).TraceTransaction), ctx, actHash, config) } +// Track mocks base method. +func (m *MockCoreService) Track(ctx context.Context, start time.Time, method string, size int64, success bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Track", ctx, start, method, size, success) +} + +// Track indicates an expected call of Track. +func (mr *MockCoreServiceMockRecorder) Track(ctx, start, method, size, success interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Track", reflect.TypeOf((*MockCoreService)(nil).Track), ctx, start, method, size, success) +} + // TransactionLogByActionHash mocks base method. func (m *MockCoreService) TransactionLogByActionHash(actHash string) (*iotextypes.TransactionLog, error) { m.ctrl.T.Helper() diff --git a/test/mock/mock_apiresponder/mock_apitypes.go b/test/mock/mock_apiresponder/mock_apitypes.go index 98659de139..19000b2c6e 100644 --- a/test/mock/mock_apiresponder/mock_apitypes.go +++ b/test/mock/mock_apiresponder/mock_apitypes.go @@ -36,11 +36,12 @@ func (m *MockWeb3ResponseWriter) EXPECT() *MockWeb3ResponseWriterMockRecorder { } // Write mocks base method. -func (m *MockWeb3ResponseWriter) Write(arg0 interface{}) error { +func (m *MockWeb3ResponseWriter) Write(arg0 interface{}) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Write", arg0) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Write indicates an expected call of Write.