Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ston node goroutine model #28

Merged
merged 5 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/stonenode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (node *StoneNodeService) allocStone(ctx context.Context) {
}
// TBD:: stone node will support more types of stone job,
// currently only support upload secondary piece job.
if err := node.doSyncToSecondarySP(ctx, resp); err != nil {
if err := node.syncPieceToSecondarySP(ctx, resp); err != nil {
log.CtxErrorw(ctx, "upload secondary piece job failed", "error", err)
return
}
Expand Down
140 changes: 98 additions & 42 deletions service/stonenode/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"sync"
"sync/atomic"

"github.com/bnb-chain/inscription-storage-provider/mock"
"github.com/bnb-chain/inscription-storage-provider/model/piecestore"
Expand All @@ -16,85 +17,111 @@ import (
"github.com/bnb-chain/inscription-storage-provider/util/log"
)

// doSyncToSecondarySP load segment data from primary and sync to secondary.
func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, allocResp *service.StoneHubServiceAllocStoneJobResponse) error {
// syncPieceToSecondarySP load segment data from primary and sync to secondary.
func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, allocResp *service.StoneHubServiceAllocStoneJobResponse) error {
// TBD:: check secondarySPs count by redundancyType.
// EC_TYPE need EC_M + EC_K + backup
// REPLICA_TYPE and INLINE_TYPE need segments
secondarySPs := mock.AllocUploadSecondarySP()
pieceData, err := node.loadSegmentsData(ctx, allocResp)
if err != nil {
node.reportErrToStoneHub(ctx, allocResp, err)
return err
}
redundancyType := allocResp.GetPieceJob().GetRedundancyType()
secondaryPieceData, err := node.dispatchSecondarySP(pieceData, redundancyType, secondarySPs)
if err != nil {
node.reportErrToStoneHub(ctx, allocResp, err)
return err
}
node.syncPieceToSecondarySP(ctx, allocResp, secondaryPieceData)
node.doSyncToSecondarySP(ctx, allocResp, secondaryPieceData)
return nil
}

// loadSegmentsData load segment data from primary storage provider.
func (node *StoneNodeService) loadSegmentsData(ctx context.Context, allocResp *service.StoneHubServiceAllocStoneJobResponse) (map[string][][]byte, error) {
type segment struct {
pieceKey string
pieceData [][]byte
pieceErr error
objectID uint64
pieceKey string
segmentData []byte
pieceData [][]byte
pieceErr error
redundancyType ptypes.RedundancyType
}
var (
doneSegments int64
loadSegmentErr error
segmentCh = make(chan *segment)
interruptCh = make(chan struct{})
pieces = make(map[string][][]byte)
objectID = allocResp.GetPieceJob().GetObjectId()
payloadSize = allocResp.GetPieceJob().GetPayloadSize()
redundancyType = allocResp.GetPieceJob().GetRedundancyType()
segmentCount = util.ComputeSegmentCount(payloadSize)
pieces = make(map[string][][]byte)
)
loadFunc := func(ctx context.Context, segment *segment) error {
select {
case <-interruptCh:
break
default:
data, err := node.store.getPiece(ctx, segment.pieceKey, 0, 0)
if err != nil {
log.CtxErrorw(ctx, "stone node gets segment data from piece store failed", "error", err, "piece key", segment.pieceKey)
return err
}
segment.segmentData = data
}
return nil
}
spiltFunc := func(ctx context.Context, segment *segment) error {
select {
case <-interruptCh:
break
default:
pieceData, err := node.spiltSegmentData(ctx, redundancyType, segment.segmentData)
if err != nil {
log.CtxErrorw(ctx, "stone node ec failed", "error", err, "piece key", segment.pieceKey)
return err
}
segment.pieceData = pieceData
}
return nil
}
for i := 0; i < int(segmentCount); i++ {
go func(segmentIdx int) {
segment := &segment{}
pieceKey := piecestore.EncodeSegmentPieceKey(objectID, segmentIdx)
data, err := node.store.getPiece(ctx, pieceKey, 0, 0)
if err != nil {
log.CtxErrorw(ctx, "stone node gets segment data from piece store failed", "error", err, "piece key", pieceKey)
segment.pieceErr = err
segmentCh <- segment
segment := &segment{
objectID: objectID,
pieceKey: piecestore.EncodeSegmentPieceKey(objectID, segmentIdx),
redundancyType: redundancyType,
}
defer func() {
if segment.pieceErr != nil || atomic.AddInt64(&doneSegments, 1) == int64(segmentCount) {
close(interruptCh)
close(segmentCh)
}
}()
if loadSegmentErr = loadFunc(ctx, segment); loadSegmentErr != nil {
return
}

pieceData, err := node.spiltSegmentData(ctx, redundancyType, data)
if err != nil {
log.CtxErrorw(ctx, "stone node ec failed", "error", err, "piece key", pieceKey)
segment.pieceErr = err
segmentCh <- segment
if loadSegmentErr = spiltFunc(ctx, segment); loadSegmentErr != nil {
return
}
segment.pieceData = pieceData
segmentCh <- segment
return
select {
case <-interruptCh:
return
default:
segmentCh <- segment
}
}(i)
}

var mu sync.Mutex
for {
for segment := range segmentCh {
mu.Lock()
if len(pieces) == segmentCount {
mu.Unlock()
break
}
pieces[segment.pieceKey] = segment.pieceData
mu.Unlock()

select {
case segment := <-segmentCh:
if segment.pieceErr != nil {
return pieces, segment.pieceErr
}
mu.Lock()
pieces[segment.pieceKey] = segment.pieceData
mu.Unlock()
}
}
return pieces, nil
return pieces, loadSegmentErr
}

// spiltSegmentData spilt segment data into pieces data.
Expand Down Expand Up @@ -141,8 +168,8 @@ func (node *StoneNodeService) dispatchSecondarySP(pieceData map[string][][]byte,
return secondaryPieceData, nil
}

// syncPieceToSecondarySP send piece data to the secondary
func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, resp *service.StoneHubServiceAllocStoneJobResponse,
// doSyncToSecondarySP send piece data to the secondary.
func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *service.StoneHubServiceAllocStoneJobResponse,
secondaryPieceData map[string]map[string][]byte) error {
var (
objectID = resp.GetPieceJob().GetObjectId()
Expand All @@ -151,7 +178,6 @@ func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, resp *
segmentCount = util.ComputeSegmentCount(payloadSize)
txHash = resp.GetTxHash()
)

for secondary, pieceData := range secondaryPieceData {
go func(secondary string, pieceData map[string][]byte) {
errMsg := &service.ErrMessage{}
Expand Down Expand Up @@ -205,3 +231,33 @@ func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, resp *
}
return nil
}

// reportErrToStoneHub send error message to stone hub.
func (node *StoneNodeService) reportErrToStoneHub(ctx context.Context, resp *service.StoneHubServiceAllocStoneJobResponse, reportErr error) {
if reportErr == nil {
return
}
var (
objectID = resp.GetPieceJob().GetObjectId()
payloadSize = resp.GetPieceJob().GetPayloadSize()
redundancyType = resp.GetPieceJob().GetRedundancyType()
txHash = resp.GetTxHash()
)
pieceJob := &service.PieceJob{
BucketName: resp.GetPieceJob().GetBucketName(),
ObjectName: resp.GetPieceJob().GetObjectName(),
TxHash: txHash,
ObjectId: objectID,
PayloadSize: payloadSize,
RedundancyType: redundancyType,
}
errMsg := &service.ErrMessage{
ErrCode: service.ErrCode_ERR_CODE_ERROR,
ErrMsg: reportErr.Error(),
}
if err := node.DoneSecondaryPieceJob(ctx, resp.TraceId, pieceJob, errMsg); err != nil {
log.CtxErrorw(ctx, "report stone hub err msg failed", "error", err)
return
}
log.CtxInfow(ctx, "report stone hub err msg success")
}