Skip to content

Commit

Permalink
Log summary about the node status (#3870)
Browse files Browse the repository at this point in the history
The node stats information will be printed to stdout every 5 minutes.

During indexer sync, stats report are not available

---------

Co-authored-by: dustinxie <dahuaxie@gmail.com>
  • Loading branch information
millken and dustinxie authored Jun 9, 2023
1 parent 6d5d929 commit a048de8
Show file tree
Hide file tree
Showing 23 changed files with 528 additions and 37 deletions.
6 changes: 3 additions & 3 deletions api/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/iotexproject/iotex-core/pkg/log"
)

type streamHandler func(interface{}) error
type streamHandler func(interface{}) (int, error)

type gRPCBlockListener struct {
streamHandle streamHandler
Expand Down Expand Up @@ -44,7 +44,7 @@ func (bl *gRPCBlockListener) Respond(_ string, blk *block.Block) error {
Height: blk.Height(),
}
// send blockInfo thru streaming API
if err := bl.streamHandle(&iotexapi.StreamBlocksResponse{
if _, err := bl.streamHandle(&iotexapi.StreamBlocksResponse{
Block: blockInfo,
BlockIdentifier: blockID,
}); err != nil {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (bl *web3BlockListener) Respond(id string, blk *block.Block) error {
},
}
// send blockInfo thru streaming API
if err := bl.streamHandle(res); err != nil {
if _, err := bl.streamHandle(res); err != nil {
log.L().Info(
"Error when streaming the block",
zap.Uint64("height", blk.Height()),
Expand Down
4 changes: 2 additions & 2 deletions api/blocklistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestBlockListener(t *testing.T) {

server := mock_apiserver.NewMockStreamBlocksServer(ctrl)
responder := NewGRPCBlockListener(
func(in interface{}) error {
return server.Send(in.(*iotexapi.StreamBlocksResponse))
func(in interface{}) (int, error) {
return 0, server.Send(in.(*iotexapi.StreamBlocksResponse))
},
errChan)

Expand Down
25 changes: 25 additions & 0 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
batch "github.com/iotexproject/iotex-core/pkg/messagebatcher"
"github.com/iotexproject/iotex-core/pkg/tracer"
"github.com/iotexproject/iotex-core/pkg/version"
"github.com/iotexproject/iotex-core/server/itx/nodestats"
"github.com/iotexproject/iotex-core/state"
"github.com/iotexproject/iotex-core/state/factory"
)
Expand Down Expand Up @@ -156,6 +157,9 @@ type (
gasLimit uint64,
data []byte,
config *logger.Config) ([]byte, *action.Receipt, *logger.StructLogger, error)

// Track tracks the api call
Track(ctx context.Context, start time.Time, method string, size int64, success bool)
}

// coreService implements the CoreService interface
Expand All @@ -175,6 +179,7 @@ type (
electionCommittee committee.Committee
readCache *ReadCache
messageBatcher *batch.Manager
apiStats *nodestats.APILocalStats
}

// jobDesc provides a struct to get and store logs in core.LogsInRange
Expand Down Expand Up @@ -204,6 +209,13 @@ func WithNativeElection(committee committee.Committee) Option {
}
}

// WithAPIStats is the option to return RPC stats through API.
func WithAPIStats(stats *nodestats.APILocalStats) Option {
return func(svr *coreService) {
svr.apiStats = stats
}
}

type intrinsicGasCalculator interface {
IntrinsicGas() (uint64, error)
}
Expand Down Expand Up @@ -1706,3 +1718,16 @@ func (core *coreService) TraceCall(ctx context.Context,
retval, receipt, err := core.sf.SimulateExecution(ctx, callerAddr, exec, getblockHash)
return retval, receipt, traces, err
}

// Track tracks the api call
func (core *coreService) Track(ctx context.Context, start time.Time, method string, size int64, success bool) {
if core.apiStats == nil {
return
}
elapsed := time.Since(start)
core.apiStats.ReportCall(nodestats.APIReport{
Method: method,
HandlingTime: elapsed,
Success: success,
}, size)
}
8 changes: 4 additions & 4 deletions api/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ func (svr *gRPCHandler) StreamBlocks(_ *iotexapi.StreamBlocksRequest, stream iot
defer close(errChan)
chainListener := svr.coreService.ChainListener()
if _, err := chainListener.AddResponder(NewGRPCBlockListener(
func(resp interface{}) error {
return stream.Send(resp.(*iotexapi.StreamBlocksResponse))
func(resp interface{}) (int, error) {
return 0, stream.Send(resp.(*iotexapi.StreamBlocksResponse))
},
errChan,
)); err != nil {
Expand All @@ -570,8 +570,8 @@ func (svr *gRPCHandler) StreamLogs(in *iotexapi.StreamLogsRequest, stream iotexa
chainListener := svr.coreService.ChainListener()
if _, err := chainListener.AddResponder(NewGRPCLogListener(
logfilter.NewLogFilter(in.GetFilter()),
func(in interface{}) error {
return stream.Send(in.(*iotexapi.StreamLogsResponse))
func(in interface{}) (int, error) {
return 0, stream.Send(in.(*iotexapi.StreamLogsResponse))
},
errChan,
)); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ func (handler *hTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

if err := handler.msgHandler.HandlePOSTReq(ctx, req.Body,
apitypes.NewResponseWriter(
func(resp interface{}) error {
func(resp interface{}) (int, error) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
return json.NewEncoder(w).Encode(resp)
raw, err := json.Marshal(resp)
if err != nil {
return 0, err
}
return w.Write(raw)
}),
); err != nil {
log.Logger("api").Warn("fail to respond request.", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions api/loglistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ll *gRPCLogListener) Respond(_ string, blk *block.Block) error {
for _, e := range logs {
logPb := e.ConvertToLogPb()
logPb.BlkHash = blkHash[:]
if err := ll.streamHandle(&iotexapi.StreamLogsResponse{Log: logPb}); err != nil {
if _, err := ll.streamHandle(&iotexapi.StreamLogsResponse{Log: logPb}); err != nil {
ll.errChan <- err
log.L().Info("error streaming the log",
zap.Uint64("height", e.BlockHeight),
Expand Down Expand Up @@ -81,7 +81,7 @@ func (ll *web3LogListener) Respond(id string, blk *block.Block) error {
log: e,
},
}
if err := ll.streamHandle(res); err != nil {
if _, err := ll.streamHandle(res); err != nil {
log.L().Info(
"Error when streaming the block",
zap.Uint64("height", blk.Height()),
Expand Down
19 changes: 10 additions & 9 deletions api/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var MaxResponseSize = 1024 * 1024 * 100 // 100MB
type (
// Web3ResponseWriter is writer for web3 request
Web3ResponseWriter interface {
Write(interface{}) error
Write(interface{}) (int, error)
}

// Responder responds to new block
Expand All @@ -41,15 +41,15 @@ type (

// responseWriter for server
type responseWriter struct {
writeHandler func(interface{}) error
writeHandler func(interface{}) (int, error)
}

// NewResponseWriter returns a new responseWriter
func NewResponseWriter(handler func(interface{}) error) Web3ResponseWriter {
func NewResponseWriter(handler func(interface{}) (int, error)) Web3ResponseWriter {
return &responseWriter{handler}
}

func (w *responseWriter) Write(in interface{}) error {
func (w *responseWriter) Write(in interface{}) (int, error) {
return w.writeHandler(in)
}

Expand All @@ -69,20 +69,21 @@ func NewBatchWriter(singleWriter Web3ResponseWriter) *BatchWriter {
}

// Write adds data into batch buffer
func (w *BatchWriter) Write(in interface{}) error {
func (w *BatchWriter) Write(in interface{}) (int, error) {
raw, err := json.Marshal(in)
if err != nil {
return err
return 0, err
}
w.totalSize += len(raw)
if w.totalSize > MaxResponseSize {
return errors.New("response size exceeds limit")
return w.totalSize, errors.New("response size exceeds limit")
}
w.buf = append(w.buf, raw)
return nil
return w.totalSize, nil
}

// Flush writes data in batch buffer
func (w *BatchWriter) Flush() error {
return w.writer.Write(w.buf)
_, err := w.writer.Write(w.buf)
return err
}
17 changes: 11 additions & 6 deletions api/web3server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri
if err != nil {
err := errors.Wrap(err, "failed to parse web3 requests.")
span.RecordError(err)
return writer.Write(&web3Response{err: err})
_, err = writer.Write(&web3Response{err: err})
return err
}
if !web3Reqs.IsArray() {
return svr.handleWeb3Req(ctx, &web3Reqs, writer)
Expand All @@ -116,7 +117,8 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri
svr.batchRequestLimit,
)
span.RecordError(err)
return writer.Write(&web3Response{err: err})
_, err = writer.Write(&web3Response{err: err})
return err
}
batchWriter := apitypes.NewBatchWriter(writer)
for i := range web3ReqArr {
Expand All @@ -129,10 +131,12 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri

func (svr *web3Handler) handleWeb3Req(ctx context.Context, web3Req *gjson.Result, writer apitypes.Web3ResponseWriter) error {
var (
res interface{}
err error
method = web3Req.Get("method").Value()
res interface{}
err, err1 error
method = web3Req.Get("method").Value()
size int
)
defer func(start time.Time) { svr.coreService.Track(ctx, start, method.(string), int64(size), err == nil) }(time.Now())
span := tracer.SpanFromContext(ctx)
defer span.End()
span.AddEvent("handleWeb3Req")
Expand Down Expand Up @@ -239,11 +243,12 @@ func (svr *web3Handler) handleWeb3Req(ctx context.Context, web3Req *gjson.Result
} else {
log.Logger("api").Debug("web3Debug", zap.String("response", fmt.Sprintf("%+v", res)))
}
return writer.Write(&web3Response{
size, err1 = writer.Write(&web3Response{
id: int(web3Req.Get("id").Int()),
result: res,
err: err,
})
return err1
}

func parseWeb3Reqs(reader io.Reader) (gjson.Result, error) {
Expand Down
1 change: 1 addition & 0 deletions api/web3server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestHandlePost(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
core := mock_apicoreservice.NewMockCoreService(ctrl)
core.EXPECT().Track(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return().AnyTimes()
svr := newHTTPHandler(NewWeb3Handler(core, "", _defaultBatchRequestLimit))
getServerResp := func(svr *hTTPHandler, req *http.Request) *httptest.ResponseRecorder {
req.Header.Set("Content-Type", "application/json")
Expand Down
4 changes: 2 additions & 2 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock

err = wsSvr.msgHandler.HandlePOSTReq(ctx, reader,
apitypes.NewResponseWriter(
func(resp interface{}) error {
func(resp interface{}) (int, error) {
if err = ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Logger("api").Warn("failed to set write deadline timeout.", zap.Error(err))
}
return ws.WriteJSON(resp)
return 0, ws.WriteJSON(resp)
}),
)
if err != nil {
Expand Down
19 changes: 18 additions & 1 deletion blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/pkg/routine"
"github.com/iotexproject/iotex-core/server/itx/nodestats"
)

type (
Expand All @@ -42,7 +43,7 @@ type (
// BlockSync defines the interface of blocksyncer
BlockSync interface {
lifecycle.StartStopper

nodestats.StatsReporter
// TargetHeight returns the target height to sync to
TargetHeight() uint64
// ProcessSyncRequest processes a block sync request
Expand Down Expand Up @@ -122,6 +123,10 @@ func (*dummyBlockSync) SyncStatus() (uint64, uint64, uint64, string) {
return 0, 0, 0, ""
}

func (*dummyBlockSync) BuildReport() string {
return ""
}

// NewBlockSyncer returns a new block syncer instance
func NewBlockSyncer(
cfg Config,
Expand Down Expand Up @@ -335,3 +340,15 @@ func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) {
}
return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc
}

// BuildReport builds a report of block syncer
func (bs *blockSyncer) BuildReport() string {
startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus()
return fmt.Sprintf(
"BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s",
startingHeight,
tipHeight,
targetHeight,
syncSpeedDesc,
)
}
8 changes: 8 additions & 0 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/iotexproject/iotex-core/nodeinfo"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/server/itx/nodestats"
"github.com/iotexproject/iotex-core/state/factory"
)

Expand Down Expand Up @@ -92,6 +93,13 @@ func (builder *Builder) SetP2PAgent(agent p2p.Agent) *Builder {
return builder
}

// SetRPCStats sets the RPCStats instance
func (builder *Builder) SetRPCStats(stats *nodestats.APILocalStats) *Builder {
builder.createInstance()
builder.cs.apiStats = stats
return builder
}

// SetElectionCommittee sets the election committee instance
func (builder *Builder) SetElectionCommittee(c committee.Committee) *Builder {
builder.createInstance()
Expand Down
3 changes: 3 additions & 0 deletions chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/server/itx/nodestats"
"github.com/iotexproject/iotex-core/state/factory"
)

Expand Down Expand Up @@ -89,6 +90,7 @@ type ChainService struct {
contractStakingIndexer contractstaking.ContractIndexer
registry *protocol.Registry
nodeInfoManager *nodeinfo.InfoManager
apiStats *nodestats.APILocalStats
}

// Start starts the server
Expand Down Expand Up @@ -231,6 +233,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, plugins map[int]interface{}
return p2pAgent.BroadcastOutbound(ctx, msg)
}),
api.WithNativeElection(cs.electionCommittee),
api.WithAPIStats(cs.apiStats),
}

svr, err := api.NewServerV2(
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/cespare/xxhash/v2 v2.1.2
github.com/holiman/uint256 v1.2.0
github.com/mackerelio/go-osstat v0.2.4
github.com/prometheus/client_model v0.2.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/shirou/gopsutil/v3 v3.22.2
Expand Down Expand Up @@ -193,7 +194,7 @@ require (
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/time v0.1.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
Loading

0 comments on commit a048de8

Please sign in to comment.