From 1ec4edfa4b68480887b46f9e41e58649b07b7fee Mon Sep 17 00:00:00 2001 From: DylanYong Date: Wed, 12 Apr 2023 20:55:03 +0800 Subject: [PATCH 1/2] feat: task node adds seal object metrics --- pkg/metrics/metric_items.go | 25 ++++++++++++++++++++--- pkg/metrics/metrics.go | 3 ++- service/tasknode/replicate_object_task.go | 18 +++++++++++----- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/pkg/metrics/metric_items.go b/pkg/metrics/metric_items.go index dbb028a19..13ebd3359 100644 --- a/pkg/metrics/metric_items.go +++ b/pkg/metrics/metric_items.go @@ -7,6 +7,8 @@ import ( metricshttp "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics/http" ) +const serviceLabelName = "service" + // this file is used to write metric items in sp service var ( // DefaultGRPCServerMetrics create default gRPC server metrics @@ -16,14 +18,31 @@ var ( openmetrics.WithClientStreamSendHistogram(), openmetrics.WithClientStreamRecvHistogram()) // DefaultHTTPServerMetrics create default HTTP server metrics DefaultHTTPServerMetrics = metricshttp.NewServerMetrics() - // PanicsTotal record the number of rpc panics + + // PanicsTotal records the number of rpc panics PanicsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "grpc_req_panics_recovered_total", Help: "Total number of gRPC requests recovered from internal panic.", }, []string{"grpc_type", "grpc_service", "grpc_method"}) - // BlockHeightLagGauge record the current block height of block syncer service + // BlockHeightLagGauge records the current block height of block syncer service BlockHeightLagGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "block_syncer_height", Help: "Current block number of block syncer progress.", - }, []string{"service"}) + }, []string{serviceLabelName}) + // SealObjectTimeHistogram records sealing object time of task node service + SealObjectTimeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "task_node_seal_object_time", + Help: "Track task node service the time of sealing object on chain.", + Buckets: prometheus.DefBuckets, + }, []string{serviceLabelName}) + // SealObjectTotalCounter records total seal object number + SealObjectTotalCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "task_node_seal_object_total", + Help: "Track task node service handles total seal object number", + }, []string{"success_or_failure"}) + // ReplicateObjectTaskGauge records total replicate object number + ReplicateObjectTaskGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "task_node_replicate_object_task_number", + Help: "Track task node service replicate object task", + }, []string{serviceLabelName}) ) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 6a1db6c5b..e597c504e 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -97,7 +97,8 @@ func (m *Metrics) Enabled() bool { func (m *Metrics) registerMetricItems() { m.registry.MustRegister(DefaultGRPCServerMetrics, DefaultGRPCClientMetrics, DefaultHTTPServerMetrics, - collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), PanicsTotal, BlockHeightLagGauge) + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), PanicsTotal, BlockHeightLagGauge, + SealObjectTimeHistogram, SealObjectTotalCounter, ReplicateObjectTaskGauge) } func (m *Metrics) serve() { diff --git a/service/tasknode/replicate_object_task.go b/service/tasknode/replicate_object_task.go index 473d060a3..647214571 100644 --- a/service/tasknode/replicate_object_task.go +++ b/service/tasknode/replicate_object_task.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "io" "sync" + "time" sdkmath "cosmossdk.io/math" "github.com/bnb-chain/greenfield-common/go/redundancy" @@ -13,6 +14,7 @@ import ( merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" p2ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/p2p/types" "github.com/bnb-chain/greenfield-storage-provider/pkg/rcmgr" gatewayclient "github.com/bnb-chain/greenfield-storage-provider/service/gateway/client" @@ -82,13 +84,13 @@ func (sg *streamReaderGroup) produceStreamPieceData() { gotPieceSize := false for segmentPieceIdx := 0; segmentPieceIdx < sg.task.segmentPieceNumber; segmentPieceIdx++ { - segmentPiecekey := piecestore.EncodeSegmentPieceKey(sg.task.objectInfo.Id.Uint64(), uint32(segmentPieceIdx)) - segmentPieceData, err := sg.task.taskNode.pieceStore.GetPiece(context.Background(), segmentPiecekey, 0, 0) + segmentPieceKey := piecestore.EncodeSegmentPieceKey(sg.task.objectInfo.Id.Uint64(), uint32(segmentPieceIdx)) + segmentPieceData, err := sg.task.taskNode.pieceStore.GetPiece(context.Background(), segmentPieceKey, 0, 0) if err != nil { for idx := range sg.streamReaderMap { sg.streamReaderMap[idx].pWrite.CloseWithError(err) } - log.Errorw("failed to get piece data", "piece_key", segmentPiecekey, "error", err) + log.Errorw("failed to get piece data", "piece_key", segmentPieceKey, "error", err) return } if sg.task.objectInfo.GetRedundancyType() == types.REDUNDANCY_EC_TYPE { @@ -265,6 +267,7 @@ func (t *replicateObjectTask) init() error { // execute is used to start the task. func (t *replicateObjectTask) execute() { + startTime := time.Now() var ( sealMsg *storagetypes.MsgSealObject progressInfo *servicetypes.ReplicatePieceInfo @@ -272,13 +275,16 @@ func (t *replicateObjectTask) execute() { succeedIndexMap map[int]bool ) defer func() { + metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Dec() + observer := metrics.SealObjectTimeHistogram.WithLabelValues(model.TaskNodeService) + observer.Observe(time.Since(startTime).Seconds()) t.taskNode.rcScope.ReleaseMemory(t.approximateMemSize) log.CtxDebugw(t.ctx, "release memory to resource manager", "release_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService)) }() + metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Inc() t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_DOING) - succeedIndexMap = make(map[int]bool, t.redundancyNumber) isAllSucceed := func(inputIndexMap map[int]bool) bool { for i := 0; i < t.redundancyNumber; i++ { @@ -374,7 +380,6 @@ func (t *replicateObjectTask) execute() { t.taskNode.cache.Add(t.objectInfo.Id.Uint64(), progressInfo) log.CtxInfow(t.ctx, "succeed to replicate object piece stream to the target sp", "sp", sp.GetOperator(), "endpoint", sp.GetEndpoint(), "redundancy_index", rIdx) - }(redundancyIdx) } wg.Wait() @@ -386,6 +391,7 @@ func (t *replicateObjectTask) execute() { _, err := t.taskNode.signer.SealObjectOnChain(context.Background(), sealMsg) if err != nil { t.taskNode.spDB.UpdateJobState(t.objectInfo.Id.Uint64(), servicetypes.JobState_JOB_STATE_SIGN_OBJECT_ERROR) + metrics.SealObjectTotalCounter.WithLabelValues("failure").Inc() log.CtxErrorw(t.ctx, "failed to sign object by signer", "error", err) return } @@ -394,10 +400,12 @@ func (t *replicateObjectTask) execute() { t.objectInfo.GetObjectName(), 10) if err != nil { t.updateTaskState(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_ERROR) + metrics.SealObjectTotalCounter.WithLabelValues("failure").Inc() log.CtxErrorw(t.ctx, "failed to seal object on chain", "error", err) return } t.updateTaskState(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_DONE) + metrics.SealObjectTotalCounter.WithLabelValues("success").Inc() log.CtxInfo(t.ctx, "succeed to seal object on chain") } else { err := t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_ERROR) From 6c5f729e8a15d8d72c74a7a765d5d8935d786e29 Mon Sep 17 00:00:00 2001 From: will-2012 Date: Thu, 13 Apr 2023 09:42:05 +0800 Subject: [PATCH 2/2] chore: refine replicate task and add more db log --- service/tasknode/replicate_object_task.go | 95 ++++++++++++++--------- service/tasknode/task_node_service.go | 16 +++- store/sqldb/traffic.go | 11 +++ 3 files changed, 81 insertions(+), 41 deletions(-) diff --git a/service/tasknode/replicate_object_task.go b/service/tasknode/replicate_object_task.go index 647214571..5382754f8 100644 --- a/service/tasknode/replicate_object_task.go +++ b/service/tasknode/replicate_object_task.go @@ -247,44 +247,29 @@ func (t *replicateObjectTask) init() error { return err } t.sortedSpEndpoints = maps.SortKeys(t.approvalResponseMap) - // reserve memory + // calculate the reserve memory, which is used in execute time t.approximateMemSize = int(float64(t.storageParams.GetMaxSegmentSize()) * (float64(t.redundancyNumber)/float64(t.storageParams.GetRedundantDataChunkNum()) + 1)) if t.objectInfo.GetPayloadSize() < t.storageParams.GetMaxSegmentSize() { t.approximateMemSize = int(float64(t.objectInfo.GetPayloadSize()) * (float64(t.redundancyNumber)/float64(t.storageParams.GetRedundantDataChunkNum()) + 1)) } - err = t.taskNode.rcScope.ReserveMemory(t.approximateMemSize, rcmgr.ReservationPriorityAlways) - if err != nil { - log.CtxErrorw(t.ctx, "failed to reserve memory from resource manager", - "reserve_size", t.approximateMemSize, "error", err) - return err - } - log.CtxDebugw(t.ctx, "reserve memory from resource manager", - "reserve_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService)) return nil } -// execute is used to start the task. -func (t *replicateObjectTask) execute() { - startTime := time.Now() +// execute is used to start the task, and waitCh is used to wait runtime initialization. +func (t *replicateObjectTask) execute(waitCh chan error) { var ( - sealMsg *storagetypes.MsgSealObject - progressInfo *servicetypes.ReplicatePieceInfo + startTime time.Time succeedIndexMapMutex sync.RWMutex succeedIndexMap map[int]bool + err error + scopeSpan rcmgr.ResourceScopeSpan + sealMsg *storagetypes.MsgSealObject + progressInfo *servicetypes.ReplicatePieceInfo ) - defer func() { - metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Dec() - observer := metrics.SealObjectTimeHistogram.WithLabelValues(model.TaskNodeService) - observer.Observe(time.Since(startTime).Seconds()) - t.taskNode.rcScope.ReleaseMemory(t.approximateMemSize) - log.CtxDebugw(t.ctx, "release memory to resource manager", - "release_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService)) - }() - - metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Inc() - t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_DOING) + // runtime initialization + startTime = time.Now() succeedIndexMap = make(map[int]bool, t.redundancyNumber) isAllSucceed := func(inputIndexMap map[int]bool) bool { for i := 0; i < t.redundancyNumber; i++ { @@ -294,6 +279,48 @@ func (t *replicateObjectTask) execute() { } return true } + metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Inc() + t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_DOING) + + if scopeSpan, err = t.taskNode.rcScope.BeginSpan(); err != nil { + log.CtxErrorw(t.ctx, "failed to begin span", "error", err) + waitCh <- err + return + } + if err = scopeSpan.ReserveMemory(t.approximateMemSize, rcmgr.ReservationPriorityAlways); err != nil { + log.CtxErrorw(t.ctx, "failed to reserve memory from resource manager", + "reserve_size", t.approximateMemSize, "error", err) + waitCh <- err + return + } + log.CtxDebugw(t.ctx, "reserve memory from resource manager", + "reserve_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService)) + waitCh <- nil + + // defer func + defer func() { + close(waitCh) + metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Dec() + observer := metrics.SealObjectTimeHistogram.WithLabelValues(model.TaskNodeService) + observer.Observe(time.Since(startTime).Seconds()) + + if isAllSucceed(succeedIndexMap) { + metrics.SealObjectTotalCounter.WithLabelValues("success").Inc() + log.CtxInfo(t.ctx, "succeed to seal object on chain") + } else { + t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_ERROR) + metrics.SealObjectTotalCounter.WithLabelValues("failure").Inc() + log.CtxErrorw(t.ctx, "failed to replicate object data to sp", "error", err, "succeed_index_map", succeedIndexMap) + } + + if scopeSpan != nil { + scopeSpan.Done() + log.CtxDebugw(t.ctx, "release memory to resource manager", + "release_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService)) + } + }() + + // execution pickSp := func() (sp *sptypes.StorageProvider, approval *p2ptypes.GetApprovalResponse, err error) { t.mux.Lock() defer t.mux.Unlock() @@ -327,13 +354,15 @@ func (t *replicateObjectTask) execute() { log.CtxInfo(t.ctx, "succeed to replicate object data") break } - sg, err := newStreamReaderGroup(t, succeedIndexMap) + var sg *streamReaderGroup + sg, err = newStreamReaderGroup(t, succeedIndexMap) if err != nil { log.CtxErrorw(t.ctx, "failed to new stream reader group", "error", err) return } if len(sg.streamReaderMap) > len(t.sortedSpEndpoints) { log.CtxError(t.ctx, "failed to replicate due to sp is not enough") + err = merrors.ErrExhaustedSP return } sg.produceStreamPieceData() @@ -385,13 +414,12 @@ func (t *replicateObjectTask) execute() { wg.Wait() } - // seal info + // seal onto the greenfield chain if isAllSucceed(succeedIndexMap) { t.updateTaskState(servicetypes.JobState_JOB_STATE_SIGN_OBJECT_DOING) - _, err := t.taskNode.signer.SealObjectOnChain(context.Background(), sealMsg) + _, err = t.taskNode.signer.SealObjectOnChain(context.Background(), sealMsg) if err != nil { t.taskNode.spDB.UpdateJobState(t.objectInfo.Id.Uint64(), servicetypes.JobState_JOB_STATE_SIGN_OBJECT_ERROR) - metrics.SealObjectTotalCounter.WithLabelValues("failure").Inc() log.CtxErrorw(t.ctx, "failed to sign object by signer", "error", err) return } @@ -400,16 +428,9 @@ func (t *replicateObjectTask) execute() { t.objectInfo.GetObjectName(), 10) if err != nil { t.updateTaskState(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_ERROR) - metrics.SealObjectTotalCounter.WithLabelValues("failure").Inc() log.CtxErrorw(t.ctx, "failed to seal object on chain", "error", err) return } t.updateTaskState(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_DONE) - metrics.SealObjectTotalCounter.WithLabelValues("success").Inc() - log.CtxInfo(t.ctx, "succeed to seal object on chain") - } else { - err := t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_ERROR) - log.CtxErrorw(t.ctx, "failed to replicate object data to sp", "error", err, "succeed_index_map", succeedIndexMap) - return - } + } // the else failed case is in defer func } diff --git a/service/tasknode/task_node_service.go b/service/tasknode/task_node_service.go index 8a61611c3..b71531c71 100644 --- a/service/tasknode/task_node_service.go +++ b/service/tasknode/task_node_service.go @@ -23,9 +23,10 @@ func (taskNode *TaskNode) ReplicateObject(ctx context.Context, req *types.Replic } var ( - resp *types.ReplicateObjectResponse - err error - task *replicateObjectTask + resp *types.ReplicateObjectResponse + err error + task *replicateObjectTask + waitCh chan error ) ctx = log.WithValue(ctx, "object_id", req.GetObjectInfo().Id.String()) @@ -37,9 +38,16 @@ func (taskNode *TaskNode) ReplicateObject(ctx context.Context, req *types.Replic log.CtxErrorw(ctx, "failed to init replicate object task", "error", err) return nil, err } - go task.execute() + + waitCh = make(chan error) + go task.execute(waitCh) + if err = <-waitCh; err != nil { + return nil, err + } + resp = &types.ReplicateObjectResponse{} return resp, nil + } // QueryReplicatingObject query a replicating object information by object id diff --git a/store/sqldb/traffic.go b/store/sqldb/traffic.go index 04e23ba2b..7beead801 100644 --- a/store/sqldb/traffic.go +++ b/store/sqldb/traffic.go @@ -6,10 +6,12 @@ import ( "time" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + "github.com/bnb-chain/greenfield-storage-provider/pkg/log" "gorm.io/gorm" ) // CheckQuotaAndAddReadRecord check current quota, and add read record +// TODO: Traffic statistics may be inaccurate in extreme cases, optimize it in the future func (s *SpDBImpl) CheckQuotaAndAddReadRecord(record *ReadRecord, quota *BucketQuota) error { yearMonth := TimeToYearMonth(TimestampUsToTime(record.ReadTimestampUs)) bucketTraffic, err := s.GetBucketTraffic(record.BucketID, yearMonth) @@ -30,6 +32,9 @@ func (s *SpDBImpl) CheckQuotaAndAddReadRecord(record *ReadRecord, quota *BucketQ if result.Error != nil { return fmt.Errorf("failed to insert bucket traffic table: %s", result.Error) } + if result.RowsAffected != 1 { + log.Infow("insert traffic", "RowsAffected", result.RowsAffected, "record", record, "quota", quota) + } bucketTraffic = &BucketTraffic{ BucketID: insertBucketTraffic.BucketID, YearMonth: insertBucketTraffic.Month, @@ -49,6 +54,9 @@ func (s *SpDBImpl) CheckQuotaAndAddReadRecord(record *ReadRecord, quota *BucketQ if result.Error != nil { return fmt.Errorf("failed to update bucket traffic table: %s", result.Error) } + if result.RowsAffected != 1 { + log.Infow("update traffic", "RowsAffected", result.RowsAffected, "record", record, "quota", quota) + } bucketTraffic.ReadQuotaSize = quota.ReadQuotaSize } @@ -67,6 +75,9 @@ func (s *SpDBImpl) CheckQuotaAndAddReadRecord(record *ReadRecord, quota *BucketQ if result.Error != nil { return fmt.Errorf("failed to update bucket traffic table: %s", result.Error) } + if result.RowsAffected != 1 { + log.Infow("update traffic", "RowsAffected", result.RowsAffected, "record", record, "quota", quota) + } // add read record insertReadRecord := &ReadRecordTable{