Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds seal object metrics and refine some codes #308

Merged
merged 2 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions pkg/metrics/metric_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
metricshttp "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics/http"
)

const serviceLabelName = "service"

// this file is used to write metric items in sp service
var (
// DefaultGRPCServerMetrics create default gRPC server metrics
Expand All @@ -16,14 +18,31 @@ var (
openmetrics.WithClientStreamSendHistogram(), openmetrics.WithClientStreamRecvHistogram())
// DefaultHTTPServerMetrics create default HTTP server metrics
DefaultHTTPServerMetrics = metricshttp.NewServerMetrics()
// PanicsTotal record the number of rpc panics

// PanicsTotal records the number of rpc panics
PanicsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
}, []string{"grpc_type", "grpc_service", "grpc_method"})
// BlockHeightLagGauge record the current block height of block syncer service
// BlockHeightLagGauge records the current block height of block syncer service
BlockHeightLagGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "block_syncer_height",
Help: "Current block number of block syncer progress.",
}, []string{"service"})
}, []string{serviceLabelName})
// SealObjectTimeHistogram records sealing object time of task node service
SealObjectTimeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "task_node_seal_object_time",
Help: "Track task node service the time of sealing object on chain.",
Buckets: prometheus.DefBuckets,
}, []string{serviceLabelName})
// SealObjectTotalCounter records total seal object number
SealObjectTotalCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "task_node_seal_object_total",
Help: "Track task node service handles total seal object number",
}, []string{"success_or_failure"})
// ReplicateObjectTaskGauge records total replicate object number
ReplicateObjectTaskGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "task_node_replicate_object_task_number",
Help: "Track task node service replicate object task",
}, []string{serviceLabelName})
)
3 changes: 2 additions & 1 deletion pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (m *Metrics) Enabled() bool {

func (m *Metrics) registerMetricItems() {
m.registry.MustRegister(DefaultGRPCServerMetrics, DefaultGRPCClientMetrics, DefaultHTTPServerMetrics,
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), PanicsTotal, BlockHeightLagGauge)
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), PanicsTotal, BlockHeightLagGauge,
SealObjectTimeHistogram, SealObjectTotalCounter, ReplicateObjectTaskGauge)
}

func (m *Metrics) serve() {
Expand Down
97 changes: 63 additions & 34 deletions service/tasknode/replicate_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"encoding/hex"
"io"
"sync"
"time"

sdkmath "cosmossdk.io/math"
"github.com/bnb-chain/greenfield-common/go/redundancy"
"github.com/bnb-chain/greenfield-storage-provider/model"
merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
"github.com/bnb-chain/greenfield-storage-provider/model/piecestore"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
"github.com/bnb-chain/greenfield-storage-provider/pkg/metrics"
p2ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/p2p/types"
"github.com/bnb-chain/greenfield-storage-provider/pkg/rcmgr"
gatewayclient "github.com/bnb-chain/greenfield-storage-provider/service/gateway/client"
Expand Down Expand Up @@ -82,13 +84,13 @@ func (sg *streamReaderGroup) produceStreamPieceData() {
gotPieceSize := false

for segmentPieceIdx := 0; segmentPieceIdx < sg.task.segmentPieceNumber; segmentPieceIdx++ {
segmentPiecekey := piecestore.EncodeSegmentPieceKey(sg.task.objectInfo.Id.Uint64(), uint32(segmentPieceIdx))
segmentPieceData, err := sg.task.taskNode.pieceStore.GetPiece(context.Background(), segmentPiecekey, 0, 0)
segmentPieceKey := piecestore.EncodeSegmentPieceKey(sg.task.objectInfo.Id.Uint64(), uint32(segmentPieceIdx))
segmentPieceData, err := sg.task.taskNode.pieceStore.GetPiece(context.Background(), segmentPieceKey, 0, 0)
if err != nil {
for idx := range sg.streamReaderMap {
sg.streamReaderMap[idx].pWrite.CloseWithError(err)
}
log.Errorw("failed to get piece data", "piece_key", segmentPiecekey, "error", err)
log.Errorw("failed to get piece data", "piece_key", segmentPieceKey, "error", err)
return
}
if sg.task.objectInfo.GetRedundancyType() == types.REDUNDANCY_EC_TYPE {
Expand Down Expand Up @@ -245,40 +247,29 @@ func (t *replicateObjectTask) init() error {
return err
}
t.sortedSpEndpoints = maps.SortKeys(t.approvalResponseMap)
// reserve memory
// calculate the reserve memory, which is used in execute time
t.approximateMemSize = int(float64(t.storageParams.GetMaxSegmentSize()) *
(float64(t.redundancyNumber)/float64(t.storageParams.GetRedundantDataChunkNum()) + 1))
if t.objectInfo.GetPayloadSize() < t.storageParams.GetMaxSegmentSize() {
t.approximateMemSize = int(float64(t.objectInfo.GetPayloadSize()) *
(float64(t.redundancyNumber)/float64(t.storageParams.GetRedundantDataChunkNum()) + 1))
}
err = t.taskNode.rcScope.ReserveMemory(t.approximateMemSize, rcmgr.ReservationPriorityAlways)
if err != nil {
log.CtxErrorw(t.ctx, "failed to reserve memory from resource manager",
"reserve_size", t.approximateMemSize, "error", err)
return err
}
log.CtxDebugw(t.ctx, "reserve memory from resource manager",
"reserve_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService))
return nil
}

// execute is used to start the task.
func (t *replicateObjectTask) execute() {
// execute is used to start the task, and waitCh is used to wait runtime initialization.
func (t *replicateObjectTask) execute(waitCh chan error) {
var (
sealMsg *storagetypes.MsgSealObject
progressInfo *servicetypes.ReplicatePieceInfo
startTime time.Time
succeedIndexMapMutex sync.RWMutex
succeedIndexMap map[int]bool
err error
scopeSpan rcmgr.ResourceScopeSpan
sealMsg *storagetypes.MsgSealObject
progressInfo *servicetypes.ReplicatePieceInfo
)
defer func() {
t.taskNode.rcScope.ReleaseMemory(t.approximateMemSize)
log.CtxDebugw(t.ctx, "release memory to resource manager",
"release_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService))
}()

t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_DOING)

// runtime initialization
startTime = time.Now()
succeedIndexMap = make(map[int]bool, t.redundancyNumber)
isAllSucceed := func(inputIndexMap map[int]bool) bool {
for i := 0; i < t.redundancyNumber; i++ {
Expand All @@ -288,6 +279,48 @@ func (t *replicateObjectTask) execute() {
}
return true
}
metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Inc()
t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_DOING)

if scopeSpan, err = t.taskNode.rcScope.BeginSpan(); err != nil {
log.CtxErrorw(t.ctx, "failed to begin span", "error", err)
waitCh <- err
return
}
if err = scopeSpan.ReserveMemory(t.approximateMemSize, rcmgr.ReservationPriorityAlways); err != nil {
log.CtxErrorw(t.ctx, "failed to reserve memory from resource manager",
"reserve_size", t.approximateMemSize, "error", err)
waitCh <- err
return
}
log.CtxDebugw(t.ctx, "reserve memory from resource manager",
"reserve_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService))
waitCh <- nil

// defer func
defer func() {
close(waitCh)
metrics.ReplicateObjectTaskGauge.WithLabelValues(model.TaskNodeService).Dec()
observer := metrics.SealObjectTimeHistogram.WithLabelValues(model.TaskNodeService)
observer.Observe(time.Since(startTime).Seconds())

if isAllSucceed(succeedIndexMap) {
metrics.SealObjectTotalCounter.WithLabelValues("success").Inc()
log.CtxInfo(t.ctx, "succeed to seal object on chain")
} else {
t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_ERROR)
metrics.SealObjectTotalCounter.WithLabelValues("failure").Inc()
log.CtxErrorw(t.ctx, "failed to replicate object data to sp", "error", err, "succeed_index_map", succeedIndexMap)
}

if scopeSpan != nil {
scopeSpan.Done()
log.CtxDebugw(t.ctx, "release memory to resource manager",
"release_size", t.approximateMemSize, "resource_state", rcmgr.GetServiceState(model.TaskNodeService))
}
}()

// execution
pickSp := func() (sp *sptypes.StorageProvider, approval *p2ptypes.GetApprovalResponse, err error) {
t.mux.Lock()
defer t.mux.Unlock()
Expand Down Expand Up @@ -321,13 +354,15 @@ func (t *replicateObjectTask) execute() {
log.CtxInfo(t.ctx, "succeed to replicate object data")
break
}
sg, err := newStreamReaderGroup(t, succeedIndexMap)
var sg *streamReaderGroup
sg, err = newStreamReaderGroup(t, succeedIndexMap)
if err != nil {
log.CtxErrorw(t.ctx, "failed to new stream reader group", "error", err)
return
}
if len(sg.streamReaderMap) > len(t.sortedSpEndpoints) {
log.CtxError(t.ctx, "failed to replicate due to sp is not enough")
err = merrors.ErrExhaustedSP
return
}
sg.produceStreamPieceData()
Expand Down Expand Up @@ -374,16 +409,15 @@ func (t *replicateObjectTask) execute() {
t.taskNode.cache.Add(t.objectInfo.Id.Uint64(), progressInfo)
log.CtxInfow(t.ctx, "succeed to replicate object piece stream to the target sp",
"sp", sp.GetOperator(), "endpoint", sp.GetEndpoint(), "redundancy_index", rIdx)

}(redundancyIdx)
}
wg.Wait()
}

// seal info
// seal onto the greenfield chain
if isAllSucceed(succeedIndexMap) {
t.updateTaskState(servicetypes.JobState_JOB_STATE_SIGN_OBJECT_DOING)
_, err := t.taskNode.signer.SealObjectOnChain(context.Background(), sealMsg)
_, err = t.taskNode.signer.SealObjectOnChain(context.Background(), sealMsg)
if err != nil {
t.taskNode.spDB.UpdateJobState(t.objectInfo.Id.Uint64(), servicetypes.JobState_JOB_STATE_SIGN_OBJECT_ERROR)
log.CtxErrorw(t.ctx, "failed to sign object by signer", "error", err)
Expand All @@ -398,10 +432,5 @@ func (t *replicateObjectTask) execute() {
return
}
t.updateTaskState(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_DONE)
log.CtxInfo(t.ctx, "succeed to seal object on chain")
} else {
err := t.updateTaskState(servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_ERROR)
log.CtxErrorw(t.ctx, "failed to replicate object data to sp", "error", err, "succeed_index_map", succeedIndexMap)
return
}
} // the else failed case is in defer func
}
16 changes: 12 additions & 4 deletions service/tasknode/task_node_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ func (taskNode *TaskNode) ReplicateObject(ctx context.Context, req *types.Replic
}

var (
resp *types.ReplicateObjectResponse
err error
task *replicateObjectTask
resp *types.ReplicateObjectResponse
err error
task *replicateObjectTask
waitCh chan error
)

ctx = log.WithValue(ctx, "object_id", req.GetObjectInfo().Id.String())
Expand All @@ -37,9 +38,16 @@ func (taskNode *TaskNode) ReplicateObject(ctx context.Context, req *types.Replic
log.CtxErrorw(ctx, "failed to init replicate object task", "error", err)
return nil, err
}
go task.execute()

waitCh = make(chan error)
go task.execute(waitCh)
if err = <-waitCh; err != nil {
return nil, err
}

resp = &types.ReplicateObjectResponse{}
return resp, nil

}

// QueryReplicatingObject query a replicating object information by object id
Expand Down
11 changes: 11 additions & 0 deletions store/sqldb/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"time"

merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
"gorm.io/gorm"
)

// CheckQuotaAndAddReadRecord check current quota, and add read record
// TODO: Traffic statistics may be inaccurate in extreme cases, optimize it in the future
func (s *SpDBImpl) CheckQuotaAndAddReadRecord(record *ReadRecord, quota *BucketQuota) error {
yearMonth := TimeToYearMonth(TimestampUsToTime(record.ReadTimestampUs))
bucketTraffic, err := s.GetBucketTraffic(record.BucketID, yearMonth)
Expand All @@ -30,6 +32,9 @@ func (s *SpDBImpl) CheckQuotaAndAddReadRecord(record *ReadRecord, quota *BucketQ
if result.Error != nil {
return fmt.Errorf("failed to insert bucket traffic table: %s", result.Error)
}
if result.RowsAffected != 1 {
log.Infow("insert traffic", "RowsAffected", result.RowsAffected, "record", record, "quota", quota)
}
bucketTraffic = &BucketTraffic{
BucketID: insertBucketTraffic.BucketID,
YearMonth: insertBucketTraffic.Month,
Expand All @@ -49,6 +54,9 @@ func (s *SpDBImpl) CheckQuotaAndAddReadRecord(record *ReadRecord, quota *BucketQ
if result.Error != nil {
return fmt.Errorf("failed to update bucket traffic table: %s", result.Error)
}
if result.RowsAffected != 1 {
log.Infow("update traffic", "RowsAffected", result.RowsAffected, "record", record, "quota", quota)
}
bucketTraffic.ReadQuotaSize = quota.ReadQuotaSize
}

Expand All @@ -67,6 +75,9 @@ func (s *SpDBImpl) CheckQuotaAndAddReadRecord(record *ReadRecord, quota *BucketQ
if result.Error != nil {
return fmt.Errorf("failed to update bucket traffic table: %s", result.Error)
}
if result.RowsAffected != 1 {
log.Infow("update traffic", "RowsAffected", result.RowsAffected, "record", record, "quota", quota)
}

// add read record
insertReadRecord := &ReadRecordTable{
Expand Down