Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log summary about the node status #3870

Merged
merged 20 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/iotexproject/iotex-core/pkg/log"
)

type streamHandler func(interface{}) error
type streamHandler func(interface{}) (int, error)

type gRPCBlockListener struct {
streamHandle streamHandler
Expand Down Expand Up @@ -44,7 +44,7 @@ func (bl *gRPCBlockListener) Respond(_ string, blk *block.Block) error {
Height: blk.Height(),
}
// send blockInfo thru streaming API
if err := bl.streamHandle(&iotexapi.StreamBlocksResponse{
if _, err := bl.streamHandle(&iotexapi.StreamBlocksResponse{
Block: blockInfo,
BlockIdentifier: blockID,
}); err != nil {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (bl *web3BlockListener) Respond(id string, blk *block.Block) error {
},
}
// send blockInfo thru streaming API
if err := bl.streamHandle(res); err != nil {
if _, err := bl.streamHandle(res); err != nil {
log.L().Info(
"Error when streaming the block",
zap.Uint64("height", blk.Height()),
Expand Down
4 changes: 2 additions & 2 deletions api/blocklistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestBlockListener(t *testing.T) {

server := mock_apiserver.NewMockStreamBlocksServer(ctrl)
responder := NewGRPCBlockListener(
func(in interface{}) error {
return server.Send(in.(*iotexapi.StreamBlocksResponse))
func(in interface{}) (int, error) {
return 0, server.Send(in.(*iotexapi.StreamBlocksResponse))
},
errChan)

Expand Down
25 changes: 25 additions & 0 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/iotexproject/iotex-core/gasstation"
"github.com/iotexproject/iotex-core/pkg/log"
batch "github.com/iotexproject/iotex-core/pkg/messagebatcher"
"github.com/iotexproject/iotex-core/pkg/nodestats"
"github.com/iotexproject/iotex-core/pkg/tracer"
"github.com/iotexproject/iotex-core/pkg/version"
"github.com/iotexproject/iotex-core/state"
Expand Down Expand Up @@ -156,6 +157,9 @@ type (
gasLimit uint64,
data []byte,
config *logger.Config) ([]byte, *action.Receipt, *logger.StructLogger, error)

// Track adds a track record for the given method
Track(ctx context.Context, start time.Time, method string, size int64, success bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is an API added in the coreservice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is passed through the Option of newCoreService. There is already coreService in web3Handler, we can use it directly.

}

// coreService implements the CoreService interface
Expand All @@ -175,6 +179,7 @@ type (
electionCommittee committee.Committee
readCache *ReadCache
messageBatcher *batch.Manager
rpcStats nodestats.RPCLocalStats
}

// jobDesc provides a struct to get and store logs in core.LogsInRange
Expand Down Expand Up @@ -204,6 +209,13 @@ func WithNativeElection(committee committee.Committee) Option {
}
}

// WithRPCStats is the option to return RPC stats through API.
func WithRPCStats(stats nodestats.RPCLocalStats) Option {
return func(svr *coreService) {
svr.rpcStats = stats
}
}

type intrinsicGasCalculator interface {
IntrinsicGas() (uint64, error)
}
Expand Down Expand Up @@ -1706,3 +1718,16 @@ func (core *coreService) TraceCall(ctx context.Context,
retval, receipt, err := core.sf.SimulateExecution(ctx, callerAddr, exec, getblockHash)
return retval, receipt, traces, err
}

// Track
func (core *coreService) Track(ctx context.Context, start time.Time, method string, size int64, success bool) {
elapsed := time.Since(start)
if core.rpcStats == nil {
return
}
core.rpcStats.ReportCall(nodestats.APIReport{
Method: method,
HandlingTime: elapsed,
Success: success,
}, size)
}
8 changes: 4 additions & 4 deletions api/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ func (svr *gRPCHandler) StreamBlocks(_ *iotexapi.StreamBlocksRequest, stream iot
defer close(errChan)
chainListener := svr.coreService.ChainListener()
if _, err := chainListener.AddResponder(NewGRPCBlockListener(
func(resp interface{}) error {
return stream.Send(resp.(*iotexapi.StreamBlocksResponse))
func(resp interface{}) (int, error) {
return 0, stream.Send(resp.(*iotexapi.StreamBlocksResponse))
},
errChan,
)); err != nil {
Expand All @@ -570,8 +570,8 @@ func (svr *gRPCHandler) StreamLogs(in *iotexapi.StreamLogsRequest, stream iotexa
chainListener := svr.coreService.ChainListener()
if _, err := chainListener.AddResponder(NewGRPCLogListener(
logfilter.NewLogFilter(in.GetFilter()),
func(in interface{}) error {
return stream.Send(in.(*iotexapi.StreamLogsResponse))
func(in interface{}) (int, error) {
return 0, stream.Send(in.(*iotexapi.StreamLogsResponse))
},
errChan,
)); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ func (handler *hTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

if err := handler.msgHandler.HandlePOSTReq(ctx, req.Body,
apitypes.NewResponseWriter(
func(resp interface{}) error {
func(resp interface{}) (int, error) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
return json.NewEncoder(w).Encode(resp)
raw, err := json.Marshal(resp)
if err != nil {
return 0, err
}
return w.Write(raw)
}),
); err != nil {
log.Logger("api").Warn("fail to respond request.", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions api/loglistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ll *gRPCLogListener) Respond(_ string, blk *block.Block) error {
for _, e := range logs {
logPb := e.ConvertToLogPb()
logPb.BlkHash = blkHash[:]
if err := ll.streamHandle(&iotexapi.StreamLogsResponse{Log: logPb}); err != nil {
if _, err := ll.streamHandle(&iotexapi.StreamLogsResponse{Log: logPb}); err != nil {
ll.errChan <- err
log.L().Info("error streaming the log",
zap.Uint64("height", e.BlockHeight),
Expand Down Expand Up @@ -81,7 +81,7 @@ func (ll *web3LogListener) Respond(id string, blk *block.Block) error {
log: e,
},
}
if err := ll.streamHandle(res); err != nil {
if _, err := ll.streamHandle(res); err != nil {
log.L().Info(
"Error when streaming the block",
zap.Uint64("height", blk.Height()),
Expand Down
19 changes: 10 additions & 9 deletions api/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var MaxResponseSize = 1024 * 1024 * 100 // 100MB
type (
// Web3ResponseWriter is writer for web3 request
Web3ResponseWriter interface {
Write(interface{}) error
Write(interface{}) (int, error)
envestcc marked this conversation as resolved.
Show resolved Hide resolved
}

// Responder responds to new block
Expand All @@ -41,15 +41,15 @@ type (

// responseWriter for server
type responseWriter struct {
writeHandler func(interface{}) error
writeHandler func(interface{}) (int, error)
}

// NewResponseWriter returns a new responseWriter
func NewResponseWriter(handler func(interface{}) error) Web3ResponseWriter {
func NewResponseWriter(handler func(interface{}) (int, error)) Web3ResponseWriter {
return &responseWriter{handler}
}

func (w *responseWriter) Write(in interface{}) error {
func (w *responseWriter) Write(in interface{}) (int, error) {
return w.writeHandler(in)
}

Expand All @@ -69,20 +69,21 @@ func NewBatchWriter(singleWriter Web3ResponseWriter) *BatchWriter {
}

// Write adds data into batch buffer
func (w *BatchWriter) Write(in interface{}) error {
func (w *BatchWriter) Write(in interface{}) (int, error) {
raw, err := json.Marshal(in)
if err != nil {
return err
return 0, err
}
w.totalSize += len(raw)
if w.totalSize > MaxResponseSize {
return errors.New("response size exceeds limit")
return w.totalSize, errors.New("response size exceeds limit")
}
w.buf = append(w.buf, raw)
return nil
return w.totalSize, nil
}

// Flush writes data in batch buffer
func (w *BatchWriter) Flush() error {
return w.writer.Write(w.buf)
_, err := w.writer.Write(w.buf)
return err
}
17 changes: 11 additions & 6 deletions api/web3server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri
if err != nil {
err := errors.Wrap(err, "failed to parse web3 requests.")
span.RecordError(err)
return writer.Write(&web3Response{err: err})
_, err = writer.Write(&web3Response{err: err})
return err
}
if !web3Reqs.IsArray() {
return svr.handleWeb3Req(ctx, &web3Reqs, writer)
Expand All @@ -116,7 +117,8 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri
svr.batchRequestLimit,
)
span.RecordError(err)
return writer.Write(&web3Response{err: err})
_, err = writer.Write(&web3Response{err: err})
return err
}
batchWriter := apitypes.NewBatchWriter(writer)
for i := range web3ReqArr {
Expand All @@ -129,10 +131,12 @@ func (svr *web3Handler) HandlePOSTReq(ctx context.Context, reader io.Reader, wri

func (svr *web3Handler) handleWeb3Req(ctx context.Context, web3Req *gjson.Result, writer apitypes.Web3ResponseWriter) error {
var (
res interface{}
err error
method = web3Req.Get("method").Value()
res interface{}
err, err1 error
method = web3Req.Get("method").Value()
size int
)
defer func(start time.Time) { svr.coreService.Track(ctx, start, method.(string), int64(size), err == nil) }(time.Now())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do gRPC and HTTP servers not require invoking the Track method?

span := tracer.SpanFromContext(ctx)
defer span.End()
span.AddEvent("handleWeb3Req")
Expand Down Expand Up @@ -239,11 +243,12 @@ func (svr *web3Handler) handleWeb3Req(ctx context.Context, web3Req *gjson.Result
} else {
log.Logger("api").Debug("web3Debug", zap.String("response", fmt.Sprintf("%+v", res)))
}
return writer.Write(&web3Response{
size, err1 = writer.Write(&web3Response{
id: int(web3Req.Get("id").Int()),
result: res,
err: err,
})
return err1
}

func parseWeb3Reqs(reader io.Reader) (gjson.Result, error) {
Expand Down
1 change: 1 addition & 0 deletions api/web3server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestHandlePost(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
core := mock_apicoreservice.NewMockCoreService(ctrl)
core.EXPECT().Track(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return().AnyTimes()
svr := newHTTPHandler(NewWeb3Handler(core, "", _defaultBatchRequestLimit))
getServerResp := func(svr *hTTPHandler, req *http.Request) *httptest.ResponseRecorder {
req.Header.Set("Content-Type", "application/json")
Expand Down
4 changes: 2 additions & 2 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func (wsSvr *WebsocketHandler) handleConnection(ctx context.Context, ws *websock

err = wsSvr.msgHandler.HandlePOSTReq(ctx, reader,
apitypes.NewResponseWriter(
func(resp interface{}) error {
func(resp interface{}) (int, error) {
if err = ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Logger("api").Warn("failed to set write deadline timeout.", zap.Error(err))
}
return ws.WriteJSON(resp)
return 0, ws.WriteJSON(resp)
}),
)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/iotexproject/iotex-core/nodeinfo"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/pkg/nodestats"
"github.com/iotexproject/iotex-core/state/factory"
)

Expand Down Expand Up @@ -92,6 +93,13 @@ func (builder *Builder) SetP2PAgent(agent p2p.Agent) *Builder {
return builder
}

// SetRPCStats sets the RPCStats instance
func (builder *Builder) SetRPCStats(stats nodestats.RPCLocalStats) *Builder {
builder.createInstance()
builder.cs.rpcStats = stats
return builder
}

// SetElectionCommittee sets the election committee instance
func (builder *Builder) SetElectionCommittee(c committee.Committee) *Builder {
builder.createInstance()
Expand Down
3 changes: 3 additions & 0 deletions chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/pkg/nodestats"
"github.com/iotexproject/iotex-core/state/factory"
)

Expand Down Expand Up @@ -89,6 +90,7 @@ type ChainService struct {
contractStakingIndexer contractstaking.ContractIndexer
registry *protocol.Registry
nodeInfoManager *nodeinfo.InfoManager
rpcStats nodestats.RPCLocalStats
}

// Start starts the server
Expand Down Expand Up @@ -231,6 +233,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, plugins map[int]interface{}
return p2pAgent.BroadcastOutbound(ctx, msg)
}),
api.WithNativeElection(cs.electionCommittee),
api.WithRPCStats(cs.rpcStats),
}

svr, err := api.NewServerV2(
Expand Down
2 changes: 1 addition & 1 deletion dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (d *IotxDispatcher) actionHandler() {
case a := <-d.actionChan:
d.handleActionMsg(a)
case <-d.quit:
log.L().Info("action handler is terminated.")
//log.L().Info("action handler is terminated.")
millken marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/cespare/xxhash/v2 v2.1.2
github.com/holiman/uint256 v1.2.0
github.com/mackerelio/go-osstat v0.2.4
github.com/prometheus/client_model v0.2.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/shirou/gopsutil/v3 v3.22.2
Expand Down Expand Up @@ -193,7 +194,7 @@ require (
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/time v0.1.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/magefile/mage v1.9.0 h1:t3AU2wNwehMCW97vuqQLtw6puppWXHO+O2MHo5a50XE=
github.com/magefile/mage v1.9.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down Expand Up @@ -1550,8 +1552,8 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
Expand Down
Loading