Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rcmgr leak for downloader service #277

Merged
merged 3 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var (
Name: "rcmgr.disable",
Category: ResourceManagerCategory,
Usage: "Disable resource manager",
Value: false,
Value: true,
}
ResourceManagerConfigFlag = &cli.StringFlag{
Name: "rcmgr.config",
Expand Down
2 changes: 2 additions & 0 deletions model/errors/rpc_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var (
ErrUnsupportedMethod = errors.New("unsupported method")
// ErrIntegerOverflow defines integer overflow
ErrIntegerOverflow = errors.New("integer overflow")
// ErrDanglingPointer defines the nil pointer error
ErrDanglingPointer = errors.New("pointer dangling")
)

// piece store errors
Expand Down
45 changes: 23 additions & 22 deletions service/challenge/challenge_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package challenge
import (
"context"
"math"
"strconv"

merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
"github.com/bnb-chain/greenfield-storage-provider/model/piecestore"
Expand All @@ -16,14 +17,32 @@ var _ types.ChallengeServiceServer = &Challenge{}
// ChallengePiece handles the piece challenge request
// return the piece's integrity hash, piece hash and piece data
func (challenge *Challenge) ChallengePiece(ctx context.Context, req *types.ChallengePieceRequest) (*types.ChallengePieceResponse, error) {
// TODO:: gateway transparent transmission object info
ctx = log.WithValue(ctx, "object_id", strconv.FormatUint(req.ObjectId, 10))
var (
scope rcmgr.ResourceScopeSpan
resp *types.ChallengePieceResponse
err error
pieceKey string
approximatedPieceSize int
err error
resp *types.ChallengePieceResponse
)
defer func() {
if scope != nil {
scope.Done()
}
var state string
rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error {
state = scope.Stat().String()
return nil
})
log.CtxInfow(ctx, "finish to challenge piece request", "resource_state", state, "error", err)
}()
scope, err = challenge.rcScope.BeginSpan()
if err != nil {
log.CtxErrorw(ctx, "failed to begin reserve resource", "error", err)
return resp, err
}

ctx = log.Context(ctx, req)
params, err := challenge.spDB.GetStorageParams()
if err != nil {
return resp, err
Expand All @@ -39,30 +58,12 @@ func (challenge *Challenge) ChallengePiece(ctx context.Context, req *types.Chall
approximatedPieceSize = int(math.Ceil(float64(params.GetMaxSegmentSize()) / float64(params.GetRedundantDataChunkNum())))
}

// allocates memory from resource manager
scope, err := challenge.rcScope.BeginSpan()
if err != nil {
log.CtxErrorw(ctx, "failed to begin reserve resource", "error", err)
return resp, err
}
stateFunc := func() string {
var state string
rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error {
state = scope.Stat().String()
return nil
})
return state
}
err = scope.ReserveMemory(approximatedPieceSize, rcmgr.ReservationPriorityAlways)
if err != nil {
log.CtxErrorw(ctx, "failed to reserve memory from resource manager",
"reserve_size", approximatedPieceSize, "resource_state", stateFunc(), "error", err)
"reserve_size", approximatedPieceSize, "error", err)
return resp, err
}
defer func() {
scope.Done()
log.CtxDebugw(ctx, "end challenge piece request", "resource_state", stateFunc())
}()

integrity, err := challenge.spDB.GetObjectIntegrity(req.GetObjectId())
if err != nil {
Expand Down
79 changes: 39 additions & 40 deletions service/downloader/downloader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,52 @@ var _ types.DownloaderServiceServer = &Downloader{}
// GetObject downloads the payload of the object.
func (downloader *Downloader) GetObject(req *types.GetObjectRequest,
stream types.DownloaderService_GetObjectServer) (err error) {
var sendSize int
ctx := log.Context(context.Background(), req)
resp := &types.GetObjectResponse{}
objectInfo := req.GetObjectInfo()
// prevent gateway forgetting transparent transmission
if objectInfo == nil {
return merrors.ErrDanglingPointer
}
var (
ctx = log.WithValue(context.Background(), "object_id", objectInfo.Id.String())
scope rcmgr.ResourceScopeSpan
sendSize int

bucketInfo = req.GetBucketInfo()
resp = &types.GetObjectResponse{}
startOffset uint64
endOffset uint64
)
defer func() {
if err != nil {
return
if scope != nil {
scope.Done()
}
log.CtxInfow(ctx, "succeed to get object", "send_size", sendSize)
var state string
rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error {
state = scope.Stat().String()
return nil
})
log.CtxInfow(ctx, "finish to get object", "send_size", sendSize,
"resource_state", state, "error", err)
}()

bucketInfo := req.GetBucketInfo()
objectInfo := req.GetObjectInfo()

startOffset := uint64(0)
endOffset := objectInfo.GetPayloadSize() - 1
scope, err = downloader.rcScope.BeginSpan()
if err != nil {
log.CtxErrorw(ctx, "failed to begin reserve resource", "error", err)
return
}
startOffset = uint64(0)
endOffset = objectInfo.GetPayloadSize() - 1
if req.GetIsRange() {
startOffset = req.GetRangeStart()
endOffset = req.GetRangeEnd()
}
readSize := endOffset - startOffset + 1

err = scope.ReserveMemory(int(readSize), rcmgr.ReservationPriorityAlways)
if err != nil {
log.CtxErrorw(ctx, "failed to reserve memory from resource manager",
"reserve_size", readSize, "error", err)
return
}
if err = downloader.spDB.CheckQuotaAndAddReadRecord(
&sqldb.ReadRecord{
BucketID: bucketInfo.Id.Uint64(),
Expand All @@ -55,35 +80,9 @@ func (downloader *Downloader) GetObject(req *types.GetObjectRequest,
ReadQuotaSize: bucketInfo.GetChargedReadQuota() + model.DefaultSpFreeReadQuotaSize,
},
); err != nil {
log.Errorw("failed to check billing due to bucket quota", "error", err)
log.CtxErrorw(ctx, "failed to check billing due to bucket quota", "error", err)
return merrors.InnerErrorToGRPCError(err)
}

// allocate memory form resource manager
scope, err := downloader.rcScope.BeginSpan()
if err != nil {
log.Errorw("failed to begin reserve resource", "error", err)
return
}
stateFunc := func() string {
var state string
rcmgr.ResrcManager().ViewSystem(func(scope rcmgr.ResourceScope) error {
state = scope.Stat().String()
return nil
})
return state
}
err = scope.ReserveMemory(int(readSize), rcmgr.ReservationPriorityAlways)
if err != nil {
log.Errorw("failed to reserve memory from resource manager",
"reserve_size", readSize, "resource_state", stateFunc(), "error", err)
return
}
defer func() {
scope.Done()
log.Debugw("end get object request", "resource_state", stateFunc())
}()

pieceInfos, err := downloader.SplitToSegmentPieceInfos(objectInfo.Id.Uint64(), objectInfo.GetPayloadSize(), startOffset, endOffset)
if err != nil {
return
Expand All @@ -98,7 +97,7 @@ func (downloader *Downloader) GetObject(req *types.GetObjectRequest,
}
sendSize += len(resp.Data)
}
return nil
return
}

type segmentPieceInfo struct {
Expand Down
8 changes: 6 additions & 2 deletions service/gateway/object_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ func (gateway *Gateway) getObjectHandler(w http.ResponseWriter, r *http.Request)
readN, writeN int
size int
statusCode = http.StatusOK
ctx, cancel = context.WithCancel(context.Background())
)

reqContext = newRequestContext(r)
defer func() {
cancel()
if errDescription != nil {
statusCode = errDescription.statusCode
_ = errDescription.errorResponse(w, reqContext)
Expand Down Expand Up @@ -90,7 +92,7 @@ func (gateway *Gateway) getObjectHandler(w http.ResponseWriter, r *http.Request)
RangeStart: uint64(rangeStart),
RangeEnd: uint64(rangeEnd),
}
ctx := log.Context(context.Background(), req)
//ctx := log.Context(context.Background(), req)
stream, err := gateway.downloader.GetObject(ctx, req)
if err != nil {
log.Errorf("failed to get object", "error", err)
Expand Down Expand Up @@ -144,10 +146,12 @@ func (gateway *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request)
buf = make([]byte, model.DefaultStreamBufSize)
hashBuf = make([]byte, model.DefaultStreamBufSize)
md5Hash = md5.New()
ctx, cancel = context.WithCancel(context.Background())
)

reqContext = newRequestContext(r)
defer func() {
cancel()
if errDescription != nil {
_ = errDescription.errorResponse(w, reqContext)
}
Expand Down Expand Up @@ -188,7 +192,7 @@ func (gateway *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request)
return
}

stream, err := gateway.uploader.PutObject(context.Background())
stream, err := gateway.uploader.PutObject(ctx)
if err != nil {
log.Errorf("failed to put object", "error", err)
errDescription = makeErrorDescription(err)
Expand Down
4 changes: 3 additions & 1 deletion service/gateway/sync_piece_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ func (gateway *Gateway) syncPieceHandler(w http.ResponseWriter, r *http.Request)
buf = make([]byte, model.DefaultStreamBufSize)
integrityHash []byte
integrityHashSignature []byte
ctx, cancel = context.WithCancel(context.Background())
)

reqContext = newRequestContext(r)
defer func() {
cancel()
if errDescription != nil {
_ = errDescription.errorResponse(w, reqContext)
}
Expand Down Expand Up @@ -82,7 +84,7 @@ func (gateway *Gateway) syncPieceHandler(w http.ResponseWriter, r *http.Request)
return
}

stream, err := gateway.receiver.SyncObject(context.Background())
stream, err := gateway.receiver.SyncObject(ctx)
if err != nil {
log.Errorw("failed to sync piece", "error", err)
errDescription = InternalError
Expand Down