Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log summary about the node status #3870

Merged
merged 20 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/iotexproject/iotex-core/pkg/log"
)

type streamHandler func(interface{}) error
type streamHandler func(interface{}) (int, error)

type gRPCBlockListener struct {
streamHandle streamHandler
Expand Down Expand Up @@ -44,7 +44,7 @@ func (bl *gRPCBlockListener) Respond(_ string, blk *block.Block) error {
Height: blk.Height(),
}
// send blockInfo thru streaming API
if err := bl.streamHandle(&iotexapi.StreamBlocksResponse{
if _, err := bl.streamHandle(&iotexapi.StreamBlocksResponse{
Block: blockInfo,
BlockIdentifier: blockID,
}); err != nil {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (bl *web3BlockListener) Respond(id string, blk *block.Block) error {
},
}
// send blockInfo thru streaming API
if err := bl.streamHandle(res); err != nil {
if _, err := bl.streamHandle(res); err != nil {
log.L().Info(
"Error when streaming the block",
zap.Uint64("height", blk.Height()),
Expand Down
4 changes: 2 additions & 2 deletions api/blocklistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestBlockListener(t *testing.T) {

server := mock_apiserver.NewMockStreamBlocksServer(ctrl)
responder := NewGRPCBlockListener(
func(in interface{}) error {
return server.Send(in.(*iotexapi.StreamBlocksResponse))
func(in interface{}) (int, error) {
return 0, server.Send(in.(*iotexapi.StreamBlocksResponse))
},
errChan)

Expand Down
25 changes: 25 additions & 0 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
batch "github.com/iotexproject/iotex-core/pkg/messagebatcher"
"github.com/iotexproject/iotex-core/pkg/tracer"
"github.com/iotexproject/iotex-core/pkg/version"
"github.com/iotexproject/iotex-core/server/itx/nodestats"
"github.com/iotexproject/iotex-core/state"
"github.com/iotexproject/iotex-core/state/factory"
)
Expand Down Expand Up @@ -156,6 +157,9 @@ type (
gasLimit uint64,
data []byte,
config *logger.Config) ([]byte, *action.Receipt, *logger.StructLogger, error)

// Track tracks the api call
Track(ctx context.Context, start time.Time, method string, size int64, success bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is an API added in the coreservice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is passed through the Option of newCoreService. There is already coreService in web3Handler, we can use it directly.

}

// coreService implements the CoreService interface
Expand All @@ -175,6 +179,7 @@ type (
electionCommittee committee.Committee
readCache *ReadCache
messageBatcher *batch.Manager
apiStats *nodestats.APILocalStats
}

// jobDesc provides a struct to get and store logs in core.LogsInRange
Expand Down Expand Up @@ -204,6 +209,13 @@ func WithNativeElection(committee committee.Committee) Option {
}
}

// WithAPIStats is the option to return RPC stats through API.
func WithAPIStats(stats *nodestats.APILocalStats) Option {
return func(svr *coreService) {
svr.apiStats = stats
}
}

type intrinsicGasCalculator interface {
IntrinsicGas() (uint64, error)
}
Expand Down Expand Up @@ -1706,3 +1718,16 @@ func (core *coreService) TraceCall(ctx context.Context,
retval, receipt, err := core.sf.SimulateExecution(ctx, callerAddr, exec, getblockHash)
return retval, receipt, traces, err
}

// Track tracks the api call
func (core *coreService) Track(ctx context.Context, start time.Time, method string, size int64, success bool) {
if core.apiStats == nil {
return
}
elapsed := time.Since(start)
core.apiStats.ReportCall(nodestats.APIReport{
Method: method,
HandlingTime: elapsed,
Success: success,
}, size)
}
8 changes: 4 additions & 4 deletions api/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ func (svr *gRPCHandler) StreamBlocks(_ *iotexapi.StreamBlocksRequest, stream iot
defer close(errChan)
chainListener := svr.coreService.ChainListener()
if _, err := chainListener.AddResponder(NewGRPCBlockListener(
func(resp interface{}) error {
return stream.Send(resp.(*iotexapi.StreamBlocksResponse))
func(resp interface{}) (int, error) {
return 0, stream.Send(resp.(*iotexapi.StreamBlocksResponse))
},
errChan,
)); err != nil {
Expand All @@ -570,8 +570,8 @@ func (svr *gRPCHandler) StreamLogs(in *iotexapi.StreamLogsRequest, stream iotexa
chainListener := svr.coreService.ChainListener()
if _, err := chainListener.AddResponder(NewGRPCLogListener(
logfilter.NewLogFilter(in.GetFilter()),
func(in interface{}) error {
return stream.Send(in.(*iotexapi.StreamLogsResponse))
func(in interface{}) (int, error) {
return 0, stream.Send(in.(*iotexapi.StreamLogsResponse))
},
errChan,
)); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ func (handler *hTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

if err := handler.msgHandler.HandlePOSTReq(ctx, req.Body,
apitypes.NewResponseWriter(
func(resp interface{}) error {
func(resp interface{}) (int, error) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
return json.NewEncoder(w).Encode(resp)
raw, err := json.Marshal(resp)
if err != nil {
return 0, err
}
return w.Write(raw)
}),
); err != nil {
log.Logger("api").Warn("fail to respond request.", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions api/loglistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ll *gRPCLogListener) Respond(_ string, blk *block.Block) error {
for _, e := range logs {
logPb := e.ConvertToLogPb()
logPb.BlkHash = blkHash[:]
if err := ll.streamHandle(&iotexapi.StreamLogsResponse{Log: logPb}); err != nil {
if _, err := ll.streamHandle(&iotexapi.StreamLogsResponse{Log: logPb}); err != nil {
ll.errChan <- err
log.L().Info("error streaming the log",
zap.Uint64("height", e.BlockHeight),
Expand Down Expand Up @@ -81,7 +81,7 @@ func (ll *web3LogListener) Respond(id string, blk *block.Block) error {
log: e,
},
}
if err := ll.streamHandle(res); err != nil {
if _, err := ll.streamHandle(res); err != nil {
log.L().Info(
"Error when streaming the block",
zap.Uint64("height", blk.Height()),
Expand Down
19 changes: 10 additions & 9 deletions api/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var MaxResponseSize = 1024 * 1024 * 100 // 100MB
type (
// Web3ResponseWriter is writer for web3 request
Web3ResponseWriter interface {
Write(interface{}) error
Write(interface{}) (int, error)
envestcc marked this conversation as resolved.
Show resolved Hide resolved
}

// Responder responds to new block
Expand All @@ -41,15 +41,15 @@ type (

// responseWriter for server
type responseWriter struct {
writeHandler func(interface{}) error
writeHandler func(interface{}) (int, error)
}

// NewResponseWriter returns a new responseWriter
func NewResponseWriter(handler func(interface{}) error) Web3ResponseWriter {
func NewResponseWriter(handler func(interface{}) (int, error)) Web3ResponseWriter {
return &responseWriter{handler}
}

func (w *responseWriter) Write(in interface{}) error {
func (w *responseWriter) Write(in interface{}) (int, error) {
return w.writeHandler(in)
}

Expand All @@ -69,20 +69,21 @@ func NewBatchWriter(singleWriter Web3ResponseWriter) *BatchWriter {
}

// Write adds data into batch buffer
func (w *BatchWriter) Write(in interface{}) error {
func (w *BatchWriter) Write(in interface{}) (int, error) {
raw, err := json.Marshal(in)
if err != nil {
return err
return 0, err
}
w.totalSize += len(raw)
if w.totalSize > MaxResponseSize {
return errors.New("response size exceeds limit")
return w.totalSize, errors.New("response size exceeds limit")
}
w.buf = append(w.buf, raw)
return nil
return w.totalSize, nil
}

// Flush writes data in batch buffer
func (w *BatchWriter) Flush() error {
return w.writer.Write(w.buf)
_, err := w.writer.Write(w.buf)
return err
}
17 changes: 11 additions & 6 deletions api/web3server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri
if err != nil {
err := errors.Wrap(err, "failed to parse web3 requests.")
span.RecordError(err)
return writer.Write(&web3Response{err: err})
_, err = writer.Write(&web3Response{err: err})
return err
}
if !web3Reqs.IsArray() {
return svr.handleWeb3Req(ctx, &web3Reqs, writer)
Expand All @@ -116,7 +117,8 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri
svr.batchRequestLimit,
)
span.RecordError(err)
return writer.Write(&web3Response{err: err})
_, err = writer.Write(&web3Response{err: err})
return err
}
batchWriter := apitypes.NewBatchWriter(writer)
for i := range web3ReqArr {
Expand All @@ -129,10 +131,12 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri

func (svr *web3Handler) handleWeb3Req(ctx context.Context, web3Req *gjson.Result, writer apitypes.Web3ResponseWriter) error {
var (
res interface{}
err error
method = web3Req.Get("method").Value()
res interface{}
err, err1 error
method = web3Req.Get("method").Value()
size int
)
defer func(start time.Time) { svr.coreService.Track(ctx, start, method.(string), int64(size), err == nil) }(time.Now())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do gRPC and HTTP servers not require invoking the Track method?

span := tracer.SpanFromContext(ctx)
defer span.End()
span.AddEvent("handleWeb3Req")
Expand Down Expand Up @@ -239,11 +243,12 @@ func (svr *web3Handler) handleWeb3Req(ctx context.Context, web3Req *gjson.Result
} else {
log.Logger("api").Debug("web3Debug", zap.String("response", fmt.Sprintf("%+v", res)))
}
return writer.Write(&web3Response{
size, err1 = writer.Write(&web3Response{
id: int(web3Req.Get("id").Int()),
result: res,
err: err,
})
return err1
}

func parseWeb3Reqs(reader io.Reader) (gjson.Result, error) {
Expand Down
1 change: 1 addition & 0 deletions api/web3server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestHandlePost(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
core := mock_apicoreservice.NewMockCoreService(ctrl)
core.EXPECT().Track(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return().AnyTimes()
svr := newHTTPHandler(NewWeb3Handler(core, "", _defaultBatchRequestLimit))
getServerResp := func(svr *hTTPHandler, req *http.Request) *httptest.ResponseRecorder {
req.Header.Set("Content-Type", "application/json")
Expand Down
4 changes: 2 additions & 2 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock

err = wsSvr.msgHandler.HandlePOSTReq(ctx, reader,
apitypes.NewResponseWriter(
func(resp interface{}) error {
func(resp interface{}) (int, error) {
if err = ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Logger("api").Warn("failed to set write deadline timeout.", zap.Error(err))
}
return ws.WriteJSON(resp)
return 0, ws.WriteJSON(resp)
}),
)
if err != nil {
Expand Down
19 changes: 18 additions & 1 deletion blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/pkg/routine"
"github.com/iotexproject/iotex-core/server/itx/nodestats"
)

type (
Expand All @@ -42,7 +43,7 @@ type (
// BlockSync defines the interface of blocksyncer
BlockSync interface {
lifecycle.StartStopper

nodestats.StatsReporter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this, don't need to explicitly tell, so we can remove the import at L26 above

// TargetHeight returns the target height to sync to
TargetHeight() uint64
// ProcessSyncRequest processes a block sync request
Expand Down Expand Up @@ -122,6 +123,10 @@ func (*dummyBlockSync) SyncStatus() (uint64, uint64, uint64, string) {
return 0, 0, 0, ""
}

func (*dummyBlockSync) BuildReport() string {
return ""
}

// NewBlockSyncer returns a new block syncer instance
func NewBlockSyncer(
cfg Config,
Expand Down Expand Up @@ -335,3 +340,15 @@ func (bs *blockSyncer) SyncStatus() (uint64, uint64, uint64, string) {
}
return bs.startingHeight, bs.tipHeightHandler(), bs.targetHeight, syncSpeedDesc
}

// BuildReport builds a report of block syncer
func (bs *blockSyncer) BuildReport() string {
startingHeight, tipHeight, targetHeight, syncSpeedDesc := bs.SyncStatus()
return fmt.Sprintf(
"BlockSync startingHeight: %d, tipHeight: %d, targetHeight: %d, %s",
startingHeight,
tipHeight,
targetHeight,
syncSpeedDesc,
)
}
8 changes: 8 additions & 0 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/iotexproject/iotex-core/nodeinfo"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/server/itx/nodestats"
"github.com/iotexproject/iotex-core/state/factory"
)

Expand Down Expand Up @@ -92,6 +93,13 @@ func (builder *Builder) SetP2PAgent(agent p2p.Agent) *Builder {
return builder
}

// SetRPCStats sets the RPCStats instance
func (builder *Builder) SetRPCStats(stats *nodestats.APILocalStats) *Builder {
builder.createInstance()
builder.cs.apiStats = stats
return builder
}

// SetElectionCommittee sets the election committee instance
func (builder *Builder) SetElectionCommittee(c committee.Committee) *Builder {
builder.createInstance()
Expand Down
3 changes: 3 additions & 0 deletions chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/server/itx/nodestats"
"github.com/iotexproject/iotex-core/state/factory"
)

Expand Down Expand Up @@ -89,6 +90,7 @@ type ChainService struct {
contractStakingIndexer contractstaking.ContractIndexer
registry *protocol.Registry
nodeInfoManager *nodeinfo.InfoManager
apiStats *nodestats.APILocalStats
}

// Start starts the server
Expand Down Expand Up @@ -231,6 +233,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, plugins map[int]interface{}
return p2pAgent.BroadcastOutbound(ctx, msg)
}),
api.WithNativeElection(cs.electionCommittee),
api.WithAPIStats(cs.apiStats),
}

svr, err := api.NewServerV2(
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/cespare/xxhash/v2 v2.1.2
github.com/holiman/uint256 v1.2.0
github.com/mackerelio/go-osstat v0.2.4
github.com/prometheus/client_model v0.2.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/shirou/gopsutil/v3 v3.22.2
Expand Down Expand Up @@ -193,7 +194,7 @@ require (
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/time v0.1.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
Loading