Skip to content

Commit

Permalink
feat: task node adds seal object metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
yzhaoyu committed Apr 12, 2023
1 parent ba002be commit ccf6b66
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
25 changes: 22 additions & 3 deletions pkg/metrics/metric_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
metricshttp "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics/http"
)

const serviceLabelName = "service"

// this file is used to write metric items in sp service
var (
// DefaultGRPCServerMetrics create default gRPC server metrics
Expand All @@ -16,14 +18,31 @@ var (
openmetrics.WithClientStreamSendHistogram(), openmetrics.WithClientStreamRecvHistogram())
// DefaultHTTPServerMetrics create default HTTP server metrics
DefaultHTTPServerMetrics = metricshttp.NewServerMetrics()
// PanicsTotal record the number of rpc panics

// PanicsTotal records the number of rpc panics
PanicsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
}, []string{"grpc_type", "grpc_service", "grpc_method"})
// BlockHeightLagGauge record the current block height of block syncer service
// BlockHeightLagGauge records the current block height of block syncer service
BlockHeightLagGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "block_syncer_height",
Help: "Current block number of block syncer progress.",
}, []string{"service"})
}, []string{serviceLabelName})
// SealObjectTimeHistogram records sealing object time of task node service
SealObjectTimeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "task_node_seal_object_time",
Help: "Track task node service the time of sealing object on chain.",
Buckets: prometheus.DefBuckets,
}, []string{serviceLabelName})
// SealObjectTotalCounter records total seal object number
SealObjectTotalCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "task_node_seal_object_total",
Help: "Track task node service handles total seal object number",
}, []string{"success_or_failure"})
// ReplicateObjectTaskGauge records total replicate object number
ReplicateObjectTaskGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "task_node_replicate_object_task_number",
Help: "Track task node service replicate object task",
}, []string{serviceLabelName})
)
3 changes: 2 additions & 1 deletion pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (m *Metrics) Enabled() bool {

func (m *Metrics) registerMetricItems() {
m.registry.MustRegister(DefaultGRPCServerMetrics, DefaultGRPCClientMetrics, DefaultHTTPServerMetrics,
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), PanicsTotal, BlockHeightLagGauge)
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), PanicsTotal, BlockHeightLagGauge,
SealObjectTimeHistogram, SealObjectTotalCounter)
}

func (m *Metrics) serve() {
Expand Down
18 changes: 13 additions & 5 deletions service/tasknode/replicate_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"encoding/hex"
"io"
"sync"
"time"

sdkmath "cosmossdk.io/math"
"github.com/bnb-chain/greenfield-common/go/redundancy"
"github.com/bnb-chain/greenfield-storage-provider/model"
merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
"github.com/bnb-chain/greenfield-storage-provider/model/piecestore"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
"github.com/bnb-chain/greenfield-storage-provider/pkg/metrics"
p2ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/p2p/types"
"github.com/bnb-chain/greenfield-storage-provider/pkg/rcmgr"
gatewayclient "github.com/bnb-chain/greenfield-storage-provider/service/gateway/client"
Expand Down Expand Up @@ -82,13 +84,13 @@ func (sg *streamReaderGroup) produceStreamPieceData() {
gotPieceSize := false

for segmentPieceIdx := 0; segmentPieceIdx < sg.task.segmentPieceNumber; segmentPieceIdx++ {
segmentPiecekey := piecestore.EncodeSegmentPieceKey(sg.task.objectInfo.Id.Uint64(), uint32(segmentPieceIdx))
segmentPieceData, err := sg.task.taskNode.pieceStore.GetPiece(context.Background(), segmentPiecekey, 0, 0)
segmentPieceKey := piecestore.EncodeSegmentPieceKey(sg.task.objectInfo.Id.Uint64(), uint32(segmentPieceIdx))
segmentPieceData, err := sg.task.taskNode.pieceStore.GetPiece(context.Background(), segmentPieceKey, 0, 0)
if err != nil {
for idx := range sg.streamReaderMap {
sg.streamReaderMap[idx].pWrite.CloseWithError(err)
}
log.Errorw("failed to get piece data", "piece_key", segmentPiecekey, "error", err)
log.Errorw("failed to get piece data", "piece_key", segmentPieceKey, "error", err)
return
}
if sg.task.objectInfo.GetRedundancyType() == types.REDUNDANCY_EC_TYPE {
Expand Down Expand Up @@ -265,19 +267,23 @@ func (t *replicateObjectTask) init() error {

// execute is used to start the task.
func (t *replicateObjectTask) execute() {
startTime := time.Now()
var (
sealMsg *storagetypes.MsgSealObject
progressInfo *servicetypes.ReplicatePieceInfo
succeedIndexMap map[int]bool
)
defer func() {
metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Dec()
observer := metrics.SealObjectTimeHistogram.WithLabelValues(model.TaskNodeService)
observer.Observe(time.Since(startTime).Seconds())
t.taskNode.rcScope.ReleaseMemory(t.approximateMemSize)
log.CtxDebugw(t.ctx, "release memory to resource manager",
"release_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService))
}()

metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Inc()
t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_DOING)

succeedIndexMap = make(map[int]bool, t.redundancyNumber)
isAllSucceed := func(inputIndexMap map[int]bool) bool {
for i := 0; i < t.redundancyNumber; i++ {
Expand Down Expand Up @@ -370,7 +376,6 @@ func (t *replicateObjectTask) execute() {
t.taskNode.cache.Add(t.objectInfo.Id.Uint64(), progressInfo)
log.CtxInfow(t.ctx, "succeed to replicate object piece stream to the target sp",
"sp", sp.GetOperator(), "endpoint", sp.GetEndpoint(), "redundancy_index", rIdx)

}(redundancyIdx)
}
wg.Wait()
Expand All @@ -382,6 +387,7 @@ func (t *replicateObjectTask) execute() {
_, err := t.taskNode.signer.SealObjectOnChain(context.Background(), sealMsg)
if err != nil {
t.taskNode.spDB.UpdateJobState(t.objectInfo.Id.Uint64(), servicetypes.JobState_JOB_STATE_SIGN_OBJECT_ERROR)
metrics.SealObjectTotalCounter.WithLabelValues("failure").Inc()
log.CtxErrorw(t.ctx, "failed to sign object by signer", "error", err)
return
}
Expand All @@ -390,10 +396,12 @@ func (t *replicateObjectTask) execute() {
t.objectInfo.GetObjectName(), 10)
if err != nil {
t.updateTaskState(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_ERROR)
metrics.SealObjectTotalCounter.WithLabelValues("failure").Inc()
log.CtxErrorw(t.ctx, "failed to seal object on chain", "error", err)
return
}
t.updateTaskState(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_DONE)
metrics.SealObjectTotalCounter.WithLabelValues("success").Inc()
log.CtxInfo(t.ctx, "succeed to seal object on chain")
} else {
err := t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_ERROR)
Expand Down

0 comments on commit ccf6b66

Please sign in to comment.