From 4c49c9a8595175f81ff6ec65fa70a9e3258db222 Mon Sep 17 00:00:00 2001 From: joeylichang Date: Wed, 5 Apr 2023 16:27:53 +0800 Subject: [PATCH 1/3] fix: rsmgr release ofdownloader --- model/errors/rpc_error.go | 2 + service/challenge/challenge_service.go | 45 +++++++------- service/downloader/downloader_service.go | 79 ++++++++++++------------ 3 files changed, 64 insertions(+), 62 deletions(-) diff --git a/model/errors/rpc_error.go b/model/errors/rpc_error.go index 7d31f517a..0ba643ba1 100644 --- a/model/errors/rpc_error.go +++ b/model/errors/rpc_error.go @@ -24,6 +24,8 @@ var ( ErrUnsupportedMethod = errors.New("unsupported method") // ErrIntegerOverflow defines integer overflow ErrIntegerOverflow = errors.New("integer overflow") + // ErrDanglingPointer defines the nil pointer error + ErrDanglingPointer = errors.New("pointer dangling") ) // piece store errors diff --git a/service/challenge/challenge_service.go b/service/challenge/challenge_service.go index 0cac152ce..5246f8df1 100644 --- a/service/challenge/challenge_service.go +++ b/service/challenge/challenge_service.go @@ -3,6 +3,7 @@ package challenge import ( "context" "math" + "strconv" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" @@ -16,14 +17,32 @@ var _ types.ChallengeServiceServer = &Challenge{} // ChallengePiece handles the piece challenge request // return the piece's integrity hash, piece hash and piece data func (challenge *Challenge) ChallengePiece(ctx context.Context, req *types.ChallengePieceRequest) (*types.ChallengePieceResponse, error) { + // TODO:: gateway transparent transmission object info + ctx = log.WithValue(ctx, "object_id", strconv.FormatUint(req.ObjectId, 10)) var ( + scope rcmgr.ResourceScopeSpan + resp *types.ChallengePieceResponse + err error pieceKey string approximatedPieceSize int - err error - resp *types.ChallengePieceResponse ) + defer func() { + var state string + rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error { + state = scope.Stat().String() + return nil + }) + if scope != nil { + scope.Done() + } + log.CtxInfow(ctx, "finish to challenge piece request", "resource_state", state, "error", err) + }() + scope, err = challenge.rcScope.BeginSpan() + if err != nil { + log.CtxErrorw(ctx, "failed to begin reserve resource", "error", err) + return resp, err + } - ctx = log.Context(ctx, req) params, err := challenge.spDB.GetStorageParams() if err != nil { return resp, err @@ -39,30 +58,12 @@ func (challenge *Challenge) ChallengePiece(ctx context.Context, req *types.Chall approximatedPieceSize = int(math.Ceil(float64(params.GetMaxSegmentSize()) / float64(params.GetRedundantDataChunkNum()))) } - // allocates memory from resource manager - scope, err := challenge.rcScope.BeginSpan() - if err != nil { - log.CtxErrorw(ctx, "failed to begin reserve resource", "error", err) - return resp, err - } - stateFunc := func() string { - var state string - rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error { - state = scope.Stat().String() - return nil - }) - return state - } err = scope.ReserveMemory(approximatedPieceSize, rcmgr.ReservationPriorityAlways) if err != nil { log.CtxErrorw(ctx, "failed to reserve memory from resource manager", - "reserve_size", approximatedPieceSize, "resource_state", stateFunc(), "error", err) + "reserve_size", approximatedPieceSize, "error", err) return resp, err } - defer func() { - scope.Done() - log.CtxDebugw(ctx, "end challenge piece request", "resource_state", stateFunc()) - }() integrity, err := challenge.spDB.GetObjectIntegrity(req.GetObjectId()) if err != nil { diff --git a/service/downloader/downloader_service.go b/service/downloader/downloader_service.go index ea14965d0..98f85a053 100644 --- a/service/downloader/downloader_service.go +++ b/service/downloader/downloader_service.go @@ -20,27 +20,52 @@ var _ types.DownloaderServiceServer = &Downloader{} // GetObject downloads the payload of the object. func (downloader *Downloader) GetObject(req *types.GetObjectRequest, stream types.DownloaderService_GetObjectServer) (err error) { - var sendSize int - ctx := log.Context(context.Background(), req) - resp := &types.GetObjectResponse{} + objectInfo := req.GetObjectInfo() + // prevent gateway forgetting transparent transmission + if objectInfo == nil { + return merrors.ErrDanglingPointer + } + var ( + ctx = log.WithValue(context.Background(), "object_id", objectInfo.Id.String()) + scope rcmgr.ResourceScopeSpan + sendSize int + + bucketInfo = req.GetBucketInfo() + resp = &types.GetObjectResponse{} + startOffset uint64 + endOffset uint64 + ) defer func() { - if err != nil { - return + var state string + rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error { + state = scope.Stat().String() + return nil + }) + if scope != nil { + scope.Done() } - log.CtxInfow(ctx, "succeed to get object", "send_size", sendSize) + log.CtxInfow(ctx, "finish to get object", "send_size", sendSize, + "resource_state", state, "error", err) }() - bucketInfo := req.GetBucketInfo() - objectInfo := req.GetObjectInfo() - - startOffset := uint64(0) - endOffset := objectInfo.GetPayloadSize() - 1 + scope, err = downloader.rcScope.BeginSpan() + if err != nil { + log.CtxErrorw(ctx, "failed to begin reserve resource", "error", err) + return + } + startOffset = uint64(0) + endOffset = objectInfo.GetPayloadSize() - 1 if req.GetIsRange() { startOffset = req.GetRangeStart() endOffset = req.GetRangeEnd() } readSize := endOffset - startOffset + 1 - + err = scope.ReserveMemory(int(readSize), rcmgr.ReservationPriorityAlways) + if err != nil { + log.CtxErrorw(ctx, "failed to reserve memory from resource manager", + "reserve_size", readSize, "error", err) + return + } if err = downloader.spDB.CheckQuotaAndAddReadRecord( &sqldb.ReadRecord{ BucketID: bucketInfo.Id.Uint64(), @@ -55,35 +80,9 @@ func (downloader *Downloader) GetObject(req *types.GetObjectRequest, ReadQuotaSize: bucketInfo.GetChargedReadQuota() + model.DefaultSpFreeReadQuotaSize, }, ); err != nil { - log.Errorw("failed to check billing due to bucket quota", "error", err) + log.CtxErrorw(ctx, "failed to check billing due to bucket quota", "error", err) return merrors.InnerErrorToGRPCError(err) } - - // allocate memory form resource manager - scope, err := downloader.rcScope.BeginSpan() - if err != nil { - log.Errorw("failed to begin reserve resource", "error", err) - return - } - stateFunc := func() string { - var state string - rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error { - state = scope.Stat().String() - return nil - }) - return state - } - err = scope.ReserveMemory(int(readSize), rcmgr.ReservationPriorityAlways) - if err != nil { - log.Errorw("failed to reserve memory from resource manager", - "reserve_size", readSize, "resource_state", stateFunc(), "error", err) - return - } - defer func() { - scope.Done() - log.Debugw("end get object request", "resource_state", stateFunc()) - }() - pieceInfos, err := downloader.SplitToSegmentPieceInfos(objectInfo.Id.Uint64(), objectInfo.GetPayloadSize(), startOffset, endOffset) if err != nil { return @@ -98,7 +97,7 @@ func (downloader *Downloader) GetObject(req *types.GetObjectRequest, } sendSize += len(resp.Data) } - return nil + return } type segmentPieceInfo struct { From 0411daeb8df09c65366048c812a7e53d5013857a Mon Sep 17 00:00:00 2001 From: joeylichang Date: Wed, 5 Apr 2023 17:11:57 +0800 Subject: [PATCH 2/3] fix: chang remger disable defeult to true --- cmd/utils/flags.go | 2 +- service/challenge/challenge_service.go | 6 +++--- service/downloader/downloader_service.go | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 0898e8854..d36a2d8f4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -69,7 +69,7 @@ var ( Name: "rcmgr.disable", Category: ResourceManagerCategory, Usage: "Disable resource manager", - Value: false, + Value: true, } ResourceManagerConfigFlag = &cli.StringFlag{ Name: "rcmgr.config", diff --git a/service/challenge/challenge_service.go b/service/challenge/challenge_service.go index 5246f8df1..4eb89daa5 100644 --- a/service/challenge/challenge_service.go +++ b/service/challenge/challenge_service.go @@ -27,14 +27,14 @@ func (challenge *Challenge) ChallengePiece(ctx context.Context, req *types.Chall approximatedPieceSize int ) defer func() { + if scope != nil { + scope.Done() + } var state string rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error { state = scope.Stat().String() return nil }) - if scope != nil { - scope.Done() - } log.CtxInfow(ctx, "finish to challenge piece request", "resource_state", state, "error", err) }() scope, err = challenge.rcScope.BeginSpan() diff --git a/service/downloader/downloader_service.go b/service/downloader/downloader_service.go index 98f85a053..27a4b2f08 100644 --- a/service/downloader/downloader_service.go +++ b/service/downloader/downloader_service.go @@ -36,14 +36,14 @@ func (downloader *Downloader) GetObject(req *types.GetObjectRequest, endOffset uint64 ) defer func() { + if scope != nil { + scope.Done() + } var state string rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error { state = scope.Stat().String() return nil }) - if scope != nil { - scope.Done() - } log.CtxInfow(ctx, "finish to get object", "send_size", sendSize, "resource_state", state, "error", err) }() From 0f0a4d495ae3f31eaab5a34388c49899489e050a Mon Sep 17 00:00:00 2001 From: joeylichang Date: Wed, 5 Apr 2023 17:48:18 +0800 Subject: [PATCH 3/3] fix: cancel request when occer error in gateway --- service/gateway/object_handler.go | 8 ++++++-- service/gateway/sync_piece_handler.go | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/service/gateway/object_handler.go b/service/gateway/object_handler.go index 2378ba672..18c60a34b 100644 --- a/service/gateway/object_handler.go +++ b/service/gateway/object_handler.go @@ -30,10 +30,12 @@ func (gateway *Gateway) getObjectHandler(w http.ResponseWriter, r *http.Request) readN, writeN int size int statusCode = http.StatusOK + ctx, cancel = context.WithCancel(context.Background()) ) reqContext = newRequestContext(r) defer func() { + cancel() if errDescription != nil { statusCode = errDescription.statusCode _ = errDescription.errorResponse(w, reqContext) @@ -90,7 +92,7 @@ func (gateway *Gateway) getObjectHandler(w http.ResponseWriter, r *http.Request) RangeStart: uint64(rangeStart), RangeEnd: uint64(rangeEnd), } - ctx := log.Context(context.Background(), req) + //ctx := log.Context(context.Background(), req) stream, err := gateway.downloader.GetObject(ctx, req) if err != nil { log.Errorf("failed to get object", "error", err) @@ -144,10 +146,12 @@ func (gateway *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) buf = make([]byte, model.DefaultStreamBufSize) hashBuf = make([]byte, model.DefaultStreamBufSize) md5Hash = md5.New() + ctx, cancel = context.WithCancel(context.Background()) ) reqContext = newRequestContext(r) defer func() { + cancel() if errDescription != nil { _ = errDescription.errorResponse(w, reqContext) } @@ -188,7 +192,7 @@ func (gateway *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) return } - stream, err := gateway.uploader.PutObject(context.Background()) + stream, err := gateway.uploader.PutObject(ctx) if err != nil { log.Errorf("failed to put object", "error", err) errDescription = makeErrorDescription(err) diff --git a/service/gateway/sync_piece_handler.go b/service/gateway/sync_piece_handler.go index 4cd3a1597..3f00ef427 100644 --- a/service/gateway/sync_piece_handler.go +++ b/service/gateway/sync_piece_handler.go @@ -30,10 +30,12 @@ func (gateway *Gateway) syncPieceHandler(w http.ResponseWriter, r *http.Request) buf = make([]byte, model.DefaultStreamBufSize) integrityHash []byte integrityHashSignature []byte + ctx, cancel = context.WithCancel(context.Background()) ) reqContext = newRequestContext(r) defer func() { + cancel() if errDescription != nil { _ = errDescription.errorResponse(w, reqContext) } @@ -82,7 +84,7 @@ func (gateway *Gateway) syncPieceHandler(w http.ResponseWriter, r *http.Request) return } - stream, err := gateway.receiver.SyncObject(context.Background()) + stream, err := gateway.receiver.SyncObject(ctx) if err != nil { log.Errorw("failed to sync piece", "error", err) errDescription = InternalError