diff --git a/.github/workflows/proto-lint.yml b/.github/workflows/proto-lint.yml index 68a8c916f..084ba4187 100644 --- a/.github/workflows/proto-lint.yml +++ b/.github/workflows/proto-lint.yml @@ -28,10 +28,6 @@ jobs: with: input: 'proto' - - uses: bufbuild/buf-breaking-action@v1 - with: - against: "https//github.com/bnb-chain/greenfield-storage-provider" - # proto-lint-success: # needs: proto-lint # if: ${{ success() }} diff --git a/model/const.go b/model/const.go index c61df40cf..55ebd873f 100644 --- a/model/const.go +++ b/model/const.go @@ -36,7 +36,7 @@ const ( // RPC config const ( // server and client max send or recv msg size - MaxCallMsgSize = 40 * 1024 * 1024 + MaxCallMsgSize = 25 * 1024 * 1024 ) // Gateway diff --git a/model/errors/errors.go b/model/errors/errors.go index 48811268f..0962ec507 100644 --- a/model/errors/errors.go +++ b/model/errors/errors.go @@ -52,10 +52,11 @@ var ( var ( ErrStoneNodeStarted = errors.New("stone node resource is running") ErrStoneNodeStopped = errors.New("stone node service has stopped") - ErrIntegrityHash = errors.New("secondary integrity hash check error") + ErrIntegrityHash = errors.New("secondary integrity hash verifies error") ErrRedundancyType = errors.New("unknown redundancy type") ErrEmptyJob = errors.New("alloc stone job is empty") ErrSecondarySPNumber = errors.New("secondary sp is not enough") + ErrInvalidPieceData = errors.New("invalid piece data") ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1") ErrInvalidECData = errors.New("invalid ec data, length is not equal to 6") ErrEmptyTargetIdx = errors.New("target index array is empty") diff --git a/model/piecestore/piece_key.go b/model/piecestore/piece_key.go index 9a5afedde..2c86cb33d 100644 --- a/model/piecestore/piece_key.go +++ b/model/piecestore/piece_key.go @@ -23,13 +23,13 @@ func EncodeSegmentPieceKey(objectID uint64, segmentIndex uint32) string { func DecodeSegmentPieceKey(pieceKey string) (uint64, uint32, error) { keys := strings.Split(pieceKey, "_") if valid := CheckSegmentPieceKey(keys); !valid { - log.Errorw("Invalid segment piece key", "piece key", pieceKey) - return 0, 0, fmt.Errorf("Invalid segment piece key") + log.Errorw("segment piece key is wrong", "segment piece key", pieceKey) + return 0, 0, fmt.Errorf("invalid segment piece key") } objectID, _ := strconv.ParseUint(keys[0], 10, 64) s := numberRegex.FindString(keys[1]) - segmentIndex, _ := (strconv.ParseUint(s, 10, 32)) + segmentIndex, _ := strconv.ParseUint(s, 10, 32) return objectID, uint32(segmentIndex), nil } @@ -64,8 +64,8 @@ func EncodeECPieceKeyBySegmentKey(segmentKey string, pieceIndex uint32) string { func DecodeECPieceKey(pieceKey string) (uint64, uint32, uint32, error) { keys := strings.Split(pieceKey, "_") if valid := CheckECPieceKey(keys); !valid { - log.Errorw("Invalid EC piece key", "piece key", pieceKey) - return 0, 0, 0, fmt.Errorf("Invalid EC piece key") + log.Errorw("ec piece key is wrong", "ec piece key", pieceKey) + return 0, 0, 0, fmt.Errorf("invalid EC piece key") } objectID, _ := strconv.ParseUint(keys[0], 10, 64) @@ -86,7 +86,7 @@ var ( // CheckSegmentPieceKey checks ec piece key is correct func CheckSegmentPieceKey(keys []string) bool { if len(keys) != 2 { - log.Errorw("Invalid segment piece key", "piece key", keys) + log.Errorw("invalid segment piece key", "segment piece key", keys) return false } @@ -99,7 +99,7 @@ func CheckSegmentPieceKey(keys []string) bool { // CheckECPieceKey checks EC piece key is correct func CheckECPieceKey(keys []string) bool { if len(keys) != 3 { - log.Errorw("Invalid EC piece key", "piece key", keys) + log.Errorw("invalid EC piece key", "ec piece key", keys) return false } diff --git a/model/piecestore/piece_key_test.go b/model/piecestore/piece_key_test.go index aead4ff7c..6b26f47d4 100644 --- a/model/piecestore/piece_key_test.go +++ b/model/piecestore/piece_key_test.go @@ -49,35 +49,35 @@ func TestDecodeSegmentPieceKey(t *testing.T) { req: "testID_s2", wantedResp1: 0, wantedResp2: 0, - wantedErr: fmt.Errorf("Invalid segment piece key"), + wantedErr: fmt.Errorf("invalid segment piece key"), }, { name: "invalid piece key 2", req: "123456789_p", wantedResp1: 0, wantedResp2: 0, - wantedErr: fmt.Errorf("Invalid segment piece key"), + wantedErr: fmt.Errorf("invalid segment piece key"), }, { name: "invalid piece key 3", req: "123456789_s123r", wantedResp1: 0, wantedResp2: 0, - wantedErr: fmt.Errorf("Invalid segment piece key"), + wantedErr: fmt.Errorf("invalid segment piece key"), }, { name: "invalid piece key 4", req: "123456789_ss.123", wantedResp1: 0, wantedResp2: 0, - wantedErr: fmt.Errorf("Invalid segment piece key"), + wantedErr: fmt.Errorf("invalid segment piece key"), }, { name: "invalid segment piece key 4", req: "123456789_s123/111", wantedResp1: 0, wantedResp2: 0, - wantedErr: fmt.Errorf("Invalid segment piece key"), + wantedErr: fmt.Errorf("invalid segment piece key"), }, } for _, tt := range cases { @@ -137,7 +137,7 @@ func TestDecodeECPieceKey(t *testing.T) { wantedResp1: 0, wantedResp2: 0, wantedResp3: 0, - wantedErr: fmt.Errorf("Invalid EC piece key"), + wantedErr: fmt.Errorf("invalid EC piece key"), }, { name: "invalid ec piece key 2", @@ -145,7 +145,7 @@ func TestDecodeECPieceKey(t *testing.T) { wantedResp1: 0, wantedResp2: 0, wantedResp3: 0, - wantedErr: fmt.Errorf("Invalid EC piece key"), + wantedErr: fmt.Errorf("invalid EC piece key"), }, { name: "invalid ec piece key 3", @@ -153,7 +153,7 @@ func TestDecodeECPieceKey(t *testing.T) { wantedResp1: 0, wantedResp2: 0, wantedResp3: 0, - wantedErr: fmt.Errorf("Invalid EC piece key"), + wantedErr: fmt.Errorf("invalid EC piece key"), }, { name: "invalid ec piece key 4", @@ -161,7 +161,7 @@ func TestDecodeECPieceKey(t *testing.T) { wantedResp1: 0, wantedResp2: 0, wantedResp3: 0, - wantedErr: fmt.Errorf("Invalid EC piece key"), + wantedErr: fmt.Errorf("invalid EC piece key"), }, { name: "invalid ec piece key 5", @@ -169,7 +169,7 @@ func TestDecodeECPieceKey(t *testing.T) { wantedResp1: 0, wantedResp2: 0, wantedResp3: 0, - wantedErr: fmt.Errorf("Invalid EC piece key"), + wantedErr: fmt.Errorf("invalid EC piece key"), }, } for _, tt := range cases { diff --git a/proto/service/types/v1/syncer.proto b/proto/service/types/v1/syncer.proto index 137728119..fb108ac46 100644 --- a/proto/service/types/v1/syncer.proto +++ b/proto/service/types/v1/syncer.proto @@ -10,13 +10,14 @@ message SyncerInfo { // bytes tx_hash = 2; string storage_provider_id = 3; uint32 piece_count = 4; - pkg.types.v1.RedundancyType redundancy_type = 5; + uint32 piece_index = 5; + pkg.types.v1.RedundancyType redundancy_type = 6; } message SyncerServiceSyncPieceRequest { string trace_id = 1; SyncerInfo syncer_info = 2; - map piece_data = 3; // key is piece key, value is piece_data + bytes piece_data = 3; } message SyncerServiceSyncPieceResponse { diff --git a/service/client/piece_store_client.go b/service/client/piece_store_client.go index 016b0ecd2..b6d36b723 100644 --- a/service/client/piece_store_client.go +++ b/service/client/piece_store_client.go @@ -10,6 +10,8 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/util/log" ) +var _ PieceStoreAPI = &StoreClient{} + // PieceStoreAPI provides an interface to enable mocking the // StoreClient's API operation. This makes unit test to test your code easier. // diff --git a/service/client/stone_hub_client.go b/service/client/stone_hub_client.go index af0582ae7..5ba9b1f6f 100644 --- a/service/client/stone_hub_client.go +++ b/service/client/stone_hub_client.go @@ -3,7 +3,6 @@ package client import ( "context" "errors" - "io" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -14,7 +13,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/util/log" ) -var _ io.Closer = &StoneHubClient{} +var _ StoneHubAPI = &StoneHubClient{} // StoneHubAPI provides an interface to enable mocking the // StoneHubClient's API operation. This makes unit test to test your code easier. diff --git a/service/client/syncer_client.go b/service/client/syncer_client.go index 9be23fb17..cb2d303d9 100644 --- a/service/client/syncer_client.go +++ b/service/client/syncer_client.go @@ -2,7 +2,6 @@ package client import ( "context" - "io" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -12,7 +11,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/util/log" ) -var _ io.Closer = &SyncerClient{} +var _ SyncerAPI = &SyncerClient{} // SyncerAPI provides an interface to enable mocking the // SyncerClient's API operation. This makes unit test to test your code easier. diff --git a/service/stonenode/alloc_stone_job.go b/service/stonenode/alloc_stone_job.go new file mode 100644 index 000000000..4db5af635 --- /dev/null +++ b/service/stonenode/alloc_stone_job.go @@ -0,0 +1,93 @@ +package stonenode + +import ( + "context" + "errors" + + "github.com/bnb-chain/greenfield-storage-provider/mock" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/util" + "github.com/bnb-chain/greenfield-storage-provider/util/log" +) + +// allocStoneJob sends rpc request to stone hub alloc stone job +func (node *StoneNodeService) allocStoneJob(ctx context.Context) { + resp, err := node.stoneHub.AllocStoneJob(ctx) + ctx = log.Context(ctx, resp, resp.GetPieceJob()) + if err != nil { + if errors.Is(err, merrors.ErrEmptyJob) { + return + } + log.CtxErrorw(ctx, "alloc stone from stone hub failed", "error", err) + return + } + // TBD:: stone node will support more types of stone job, + // currently only support upload secondary piece job + if err := node.loadAndSyncPieces(ctx, resp); err != nil { + log.CtxErrorw(ctx, "upload secondary piece job failed", "error", err) + return + } + log.CtxInfow(ctx, "upload secondary piece job success") +} + +// loadAndSyncPieces load segment data from primary and sync to secondary +func (node *StoneNodeService) loadAndSyncPieces(ctx context.Context, allocResp *stypes.StoneHubServiceAllocStoneJobResponse) error { + // TBD:: check secondarySPs count by redundancyType. + // EC_TYPE need EC_M + EC_K + backup + // REPLICA_TYPE and INLINE_TYPE need segments count + backup + secondarySPs := mock.AllocUploadSecondarySP() + + // validate redundancyType and targetIdx + redundancyType := allocResp.GetPieceJob().GetRedundancyType() + if err := util.ValidateRedundancyType(redundancyType); err != nil { + log.CtxErrorw(ctx, "invalid redundancy type", "redundancy type", redundancyType) + node.reportErrToStoneHub(ctx, allocResp, err) + return err + } + targetIdx := allocResp.GetPieceJob().GetTargetIdx() + if len(targetIdx) == 0 { + log.CtxError(ctx, "invalid target idx length") + node.reportErrToStoneHub(ctx, allocResp, merrors.ErrEmptyTargetIdx) + return merrors.ErrEmptyTargetIdx + } + + // 1. load all segments data from primary piece store and encode + pieceData, err := node.encodeSegmentsData(ctx, allocResp) + if err != nil { + node.reportErrToStoneHub(ctx, allocResp, err) + return err + } + + // 2. dispatch the piece data to different secondary sp + secondaryPieceData, err := node.dispatchSecondarySP(pieceData, redundancyType, secondarySPs, targetIdx) + if err != nil { + log.CtxErrorw(ctx, "dispatch piece data to secondary sp error") + node.reportErrToStoneHub(ctx, allocResp, err) + return err + } + + // 3. send piece data to the secondary + node.doSyncToSecondarySP(ctx, allocResp, secondaryPieceData, secondarySPs) + return nil +} + +// reportErrToStoneHub send error message to stone hub +func (node *StoneNodeService) reportErrToStoneHub(ctx context.Context, resp *stypes.StoneHubServiceAllocStoneJobResponse, + reportErr error) { + if reportErr == nil { + return + } + req := &stypes.StoneHubServiceDoneSecondaryPieceJobRequest{ + TraceId: resp.GetTraceId(), + ErrMessage: &stypes.ErrMessage{ + ErrCode: stypes.ErrCode_ERR_CODE_ERROR, + ErrMsg: reportErr.Error(), + }, + } + if _, err := node.stoneHub.DoneSecondaryPieceJob(ctx, req); err != nil { + log.CtxErrorw(ctx, "report stone hub err msg failed", "error", err) + return + } + log.CtxInfow(ctx, "report stone hub err msg success") +} diff --git a/service/stonenode/dispatch_secondary.go b/service/stonenode/dispatch_secondary.go new file mode 100644 index 000000000..e51060043 --- /dev/null +++ b/service/stonenode/dispatch_secondary.go @@ -0,0 +1,83 @@ +package stonenode + +import ( + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/util/log" +) + +// dispatchSecondarySP convert pieceDataBySegment to ec dimensional slice, first dimensional is ec number such as ec1, contains [][]byte data +// pieceDataBySegment is a three-dimensional slice, first dimensional is segment index, second is [][]byte data +func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment [][][]byte, redundancyType ptypesv1pb.RedundancyType, secondarySPs []string, + targetIdx []uint32) ([][][]byte, error) { + if len(pieceDataBySegment) == 0 { + return nil, merrors.ErrInvalidPieceData + } + if len(secondarySPs) == 0 { + return nil, merrors.ErrSecondarySPNumber + } + var pieceDataBySecondary [][][]byte + var err error + switch redundancyType { + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + pieceDataBySecondary, err = dispatchReplicaData(pieceDataBySegment, secondarySPs, targetIdx) + default: // ec type + pieceDataBySecondary, err = dispatchECData(pieceDataBySegment, secondarySPs, targetIdx) + } + if err != nil { + log.Errorw("dispatch piece data by secondary error", "error", err) + return nil, err + } + return pieceDataBySecondary, nil +} + +// dispatchReplicaData dispatches replica or inline data into different sp, each sp should store all segments data of an object +// if an object uses replica type, it's split into 10 segments and there are 6 sp, each sp should store 10 segments data +// if an object uses inline type, there is only one segment and there are 6 sp, each sp should store 1 segment data +func dispatchReplicaData(pieceDataBySegment [][][]byte, secondarySPs []string, targetIdx []uint32) ([][][]byte, error) { + if len(secondarySPs) < len(targetIdx) { + return nil, merrors.ErrSecondarySPNumber + } + + segmentLength := len(pieceDataBySegment[0]) + if segmentLength != 1 { + return nil, merrors.ErrInvalidSegmentData + } + + data := convertSlice(pieceDataBySegment, segmentLength) + segmentPieceSlice := make([][][]byte, len(targetIdx)) + for i := 0; i < len(targetIdx); i++ { + segmentPieceSlice[i] = data[0] + } + return segmentPieceSlice, nil +} + +// dispatchECData dispatches ec data into different sp +// one sp stores same ec column data: sp1 stores all ec1 data, sp2 stores all ec2 data, etc +func dispatchECData(pieceDataBySegment [][][]byte, secondarySPs []string, targetIdx []uint32) ([][][]byte, error) { + segmentLength := len(pieceDataBySegment[0]) + if segmentLength < 6 { + return nil, merrors.ErrInvalidECData + } + if segmentLength > len(secondarySPs) { + return nil, merrors.ErrSecondarySPNumber + } + + data := convertSlice(pieceDataBySegment, segmentLength) + ecPieceSlice := make([][][]byte, len(targetIdx)) + for index, value := range targetIdx { + ecPieceSlice[index] = data[value] + } + return ecPieceSlice, nil +} + +func convertSlice(data [][][]byte, length int) [][][]byte { + tempSlice := make([][][]byte, length) + for i := 0; i < length; i++ { + tempSlice[i] = make([][]byte, 0) + for j := 0; j < len(data); j++ { + tempSlice[i] = append(tempSlice[i], data[j][i]) + } + } + return tempSlice +} diff --git a/service/stonenode/dispatch_secondary_test.go b/service/stonenode/dispatch_secondary_test.go new file mode 100644 index 000000000..7f862ae98 --- /dev/null +++ b/service/stonenode/dispatch_secondary_test.go @@ -0,0 +1,104 @@ +package stonenode + +import ( + "testing" + + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + "github.com/stretchr/testify/assert" +) + +func Test_dispatchSecondarySP(t *testing.T) { + spList := []string{"sp1", "sp2", "sp3", "sp4", "sp5", "sp6"} + cases := []struct { + name string + req1 [][][]byte + req2 ptypes.RedundancyType + req3 []string + req4 []uint32 + wantedResult int + wantedErr error + }{ + { + name: "ec type dispatch", + req1: dispatchECPiece(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: spList, + req4: []uint32{0, 1, 2, 3, 4, 5}, + wantedResult: 6, + wantedErr: nil, + }, + { + name: "replica type dispatch", + req1: dispatchSegmentPieceSlice(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{0, 1, 2}, + wantedResult: 3, + wantedErr: nil, + }, + { + name: "inline type dispatch", + req1: dispatchInlinePieceSlice(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, + req3: spList, + req4: []uint32{0}, + wantedResult: 1, + wantedErr: nil, + }, + { + name: "ec type data retransmission", + req1: dispatchECPiece(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: spList, + req4: []uint32{2, 3}, + wantedResult: 2, + wantedErr: nil, + }, + { + name: "replica type data retransmission", + req1: dispatchSegmentPieceSlice(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{1, 2}, + wantedResult: 2, + wantedErr: nil, + }, + { + name: "wrong secondary sp number", + req1: dispatchECPiece(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: []string{}, + req4: []uint32{0, 1, 2, 3, 4, 5}, + wantedResult: 0, + wantedErr: merrors.ErrSecondarySPNumber, + }, + { + name: "wrong ec segment data length", + req1: dispatchSegmentPieceSlice(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: spList, + req4: []uint32{0, 1, 2, 3, 4, 5}, + wantedResult: 0, + wantedErr: merrors.ErrInvalidECData, + }, + { + name: "invalid piece data", + req1: nil, + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{0, 1, 2, 3, 4, 5}, + wantedResult: 0, + wantedErr: merrors.ErrInvalidPieceData, + }, + } + + node := setup(t) + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + result, err := node.dispatchSecondarySP(tt.req1, tt.req2, tt.req3, tt.req4) + assert.Equal(t, tt.wantedErr, err) + assert.Equal(t, tt.wantedResult, len(result)) + }) + } +} diff --git a/service/stonenode/encode_segment.go b/service/stonenode/encode_segment.go new file mode 100644 index 000000000..46c7728c8 --- /dev/null +++ b/service/stonenode/encode_segment.go @@ -0,0 +1,125 @@ +package stonenode + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" + "github.com/bnb-chain/greenfield-storage-provider/pkg/redundancy" + ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/util" + "github.com/bnb-chain/greenfield-storage-provider/util/log" + "github.com/bnb-chain/greenfield-storage-provider/util/maps" +) + +// encodeSegmentsData load segment data from primary storage provider and encode +func (node *StoneNodeService) encodeSegmentsData(ctx context.Context, allocResp *stypes.StoneHubServiceAllocStoneJobResponse) ( + [][][]byte, error) { + type segment struct { + objectID uint64 + pieceKey string + segmentData []byte + pieceData [][]byte + pieceErr error + segmentIndex int + redundancyType ptypes.RedundancyType + } + var ( + doneSegments int64 + innerErr error + segmentCh = make(chan *segment) + interruptCh = make(chan struct{}) + pieces = make(map[int][][]byte) + objectID = allocResp.GetPieceJob().GetObjectId() + payloadSize = allocResp.GetPieceJob().GetPayloadSize() + redundancyType = allocResp.GetPieceJob().GetRedundancyType() + segmentCount = util.ComputeSegmentCount(payloadSize) + ) + + loadFunc := func(ctx context.Context, seg *segment) error { + select { + case <-interruptCh: + break + default: + data, err := node.store.GetPiece(ctx, seg.pieceKey, 0, 0) + if err != nil { + log.CtxErrorw(ctx, "gets segment data from piece store failed", "error", err, "piece key", + seg.pieceKey) + return err + } + seg.segmentData = data + } + return nil + } + + encodeFunc := func(ctx context.Context, seg *segment) error { + select { + case <-interruptCh: + break + default: + pieceData, err := node.encode(redundancyType, seg.segmentData) + if err != nil { + log.CtxErrorw(ctx, "ec encode failed", "error", err, "piece key", seg.pieceKey) + return err + } + seg.pieceData = pieceData + } + return nil + } + + for i := 0; i < int(segmentCount); i++ { + go func(segmentIdx int) { + seg := &segment{ + objectID: objectID, + pieceKey: piecestore.EncodeSegmentPieceKey(objectID, uint32(segmentIdx)), + redundancyType: redundancyType, + segmentIndex: segmentIdx, + } + defer func() { + if seg.pieceErr != nil || atomic.AddInt64(&doneSegments, 1) == int64(segmentCount) { + close(interruptCh) + close(segmentCh) + } + }() + if innerErr = loadFunc(ctx, seg); innerErr != nil { + return + } + if innerErr = encodeFunc(ctx, seg); innerErr != nil { + return + } + select { + case <-interruptCh: + return + default: + segmentCh <- seg + } + }(i) + } + + var mu sync.Mutex + for seg := range segmentCh { + mu.Lock() + pieces[seg.segmentIndex] = seg.pieceData + mu.Unlock() + } + return maps.ValueToSlice(pieces), innerErr +} + +// encode segment data by redundancyType +// pieceData contains one []byte when redundancyType is replica or inline +// pieceData contains six []byte when redundancyType is ec +func (node *StoneNodeService) encode(redundancyType ptypes.RedundancyType, segmentData []byte) ( + pieceData [][]byte, err error) { + switch redundancyType { + case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + pieceData = append(pieceData, segmentData) + default: // ec type + pieceData, err = redundancy.EncodeRawSegment(segmentData) + if err != nil { + return nil, err + } + } + return pieceData, nil +} diff --git a/service/stonenode/encode_segment_test.go b/service/stonenode/encode_segment_test.go new file mode 100644 index 000000000..71a452ac9 --- /dev/null +++ b/service/stonenode/encode_segment_test.go @@ -0,0 +1,156 @@ +package stonenode + +import ( + "context" + "errors" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "github.com/bnb-chain/greenfield-storage-provider/model" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/service/client/mock" +) + +func TestInitClientFailed(t *testing.T) { + node := &StoneNodeService{ + name: model.StoneNodeService, + stoneLimit: 0, + } + node.running.Store(true) + err := node.initClient() + assert.Equal(t, merrors.ErrStoneNodeStarted, err) +} + +func Test_encodeSegmentsDataSuccess(t *testing.T) { + cases := []struct { + name string + req1 uint64 + req2 uint64 + req3 ptypes.RedundancyType + wantedResult1 int + wantedErr error + }{ + { + name: "ec type: payload size greater than 16MB", + req1: 20230109001, + req2: 20 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + wantedResult1: 2, + wantedErr: nil, + }, + { + name: "ec type: payload size less than 16MB and greater than 1MB", + req1: 20230109002, + req2: 15 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + wantedResult1: 1, + wantedErr: nil, + }, + { + name: "replica type: payload size greater than 16MB", + req1: 20230109003, + req2: 20 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + wantedResult1: 2, + wantedErr: nil, + }, + { + name: "replica type: payload size less than 16MB and greater than 1MB", + req1: 20230109004, + req2: 15 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + wantedResult1: 1, + wantedErr: nil, + }, + { + name: "inline type: payload size less than 1MB", + req1: 20230109005, + req2: 1000 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, + wantedResult1: 1, + wantedErr: nil, + }, + } + + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ps := mock.NewMockPieceStoreAPI(ctrl) + node.store = ps + ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + return []byte("1"), nil + }).AnyTimes() + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + allocResp := mockAllocResp(tt.req1, tt.req2, tt.req3) + result, err := node.encodeSegmentsData(context.TODO(), allocResp) + assert.Equal(t, nil, err) + assert.Equal(t, tt.wantedResult1, len(result)) + }) + } +} + +func Test_loadSegmentsDataPieceStoreError(t *testing.T) { + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ps := mock.NewMockPieceStoreAPI(ctrl) + node.store = ps + ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + return nil, errors.New("piece store s3 network error") + }).AnyTimes() + + result, err := node.encodeSegmentsData(context.TODO(), mockAllocResp(20230109001, 20*1024*1024, + ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)) + assert.Equal(t, errors.New("piece store s3 network error"), err) + assert.Equal(t, 0, len(result)) +} + +func Test_encodeSegmentData(t *testing.T) { + cases := []struct { + name string + req1 ptypes.RedundancyType + req2 []byte + wantedResult int + wantedErr error + }{ + { + name: "ec type", + req1: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req2: []byte("1"), + wantedResult: 6, + wantedErr: nil, + }, + { + name: "replica type", + req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req2: []byte("1"), + wantedResult: 1, + wantedErr: nil, + }, + { + name: "inline type", + req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req2: []byte("1"), + wantedResult: 1, + wantedErr: nil, + }, + } + + node := setup(t) + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + result, err := node.encode(tt.req1, tt.req2) + assert.Equal(t, err, tt.wantedErr) + assert.Equal(t, len(result), tt.wantedResult) + }) + } +} diff --git a/service/stonenode/helper_test.go b/service/stonenode/helper_test.go index b7677a536..9707139eb 100644 --- a/service/stonenode/helper_test.go +++ b/service/stonenode/helper_test.go @@ -41,31 +41,31 @@ func mockAllocResp(objectID uint64, payloadSize uint64, redundancyType ptypes.Re } } -func dispatchPieceMap() map[string][][]byte { +func dispatchECPiece() [][][]byte { ecList1 := [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4"), []byte("5"), []byte("6")} ecList2 := [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")} - pMap := make(map[string][][]byte) - pMap["123456_s0"] = ecList1 - pMap["123456_s1"] = ecList2 - return pMap + pSlice := make([][][]byte, 0) + pSlice = append(pSlice, ecList1) + pSlice = append(pSlice, ecList2) + return pSlice } -func dispatchSegmentMap() map[string][][]byte { +func dispatchSegmentPieceSlice() [][][]byte { segmentList1 := [][]byte{[]byte("10")} segmentList2 := [][]byte{[]byte("20")} segmentList3 := [][]byte{[]byte("30")} - sMap := make(map[string][][]byte) - sMap["789_s0"] = segmentList1 - sMap["789_s1"] = segmentList2 - sMap["789_s2"] = segmentList3 - return sMap + segSlice := make([][][]byte, 0) + segSlice = append(segSlice, segmentList1) + segSlice = append(segSlice, segmentList2) + segSlice = append(segSlice, segmentList3) + return segSlice } -func dispatchInlineMap() map[string][][]byte { +func dispatchInlinePieceSlice() [][][]byte { inlineList := [][]byte{[]byte("+")} - iMap := make(map[string][][]byte) - iMap["543_s0"] = inlineList - return iMap + inlineSlice := make([][][]byte, 0) + inlineSlice = append(inlineSlice, inlineList) + return inlineSlice } func makeStreamMock() *StreamMock { diff --git a/service/stonenode/server.go b/service/stonenode/server.go index 51de2c9df..4acdba041 100644 --- a/service/stonenode/server.go +++ b/service/stonenode/server.go @@ -103,7 +103,7 @@ func (node *StoneNodeService) Start(startCtx context.Context) error { } // TBD::exceed stoneLimit or alloc empty stone, // stone node need one backoff strategy. - node.allocStone(ctx) + node.allocStoneJob(ctx) }() case <-node.stopCh: cancel() @@ -132,27 +132,3 @@ func (node *StoneNodeService) Stop(ctx context.Context) error { } return nil } - -// allocStone sends rpc request to stone hub alloc stone job. -func (node *StoneNodeService) allocStone(ctx context.Context) { - resp, err := node.stoneHub.AllocStoneJob(ctx) - ctx = log.Context(ctx, resp, resp.GetPieceJob()) - if err != nil { - if err == merrors.ErrEmptyJob { - return - } - log.CtxErrorw(ctx, "alloc stone from stone hub failed", "error", err) - return - } - // TBD:: stone node will support more types of stone job, - // currently only support upload secondary piece job. - if resp.GetPieceJob() == nil { - log.CtxDebugw(ctx, "alloc stone job empty.") - return - } - if err := node.syncPieceToSecondarySP(ctx, resp); err != nil { - log.CtxErrorw(ctx, "upload secondary piece job failed", "error", err) - return - } - log.CtxInfow(ctx, "upload secondary piece job success") -} diff --git a/service/stonenode/sync.go b/service/stonenode/sync.go deleted file mode 100644 index a334e3471..000000000 --- a/service/stonenode/sync.go +++ /dev/null @@ -1,387 +0,0 @@ -package stonenode - -import ( - "bytes" - "context" - "errors" - "sync" - "sync/atomic" - - "github.com/bnb-chain/greenfield-storage-provider/mock" - merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" - "github.com/bnb-chain/greenfield-storage-provider/pkg/redundancy" - ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" - "github.com/bnb-chain/greenfield-storage-provider/util" - "github.com/bnb-chain/greenfield-storage-provider/util/hash" - "github.com/bnb-chain/greenfield-storage-provider/util/log" -) - -// syncPieceToSecondarySP load segment data from primary and sync to secondary. -func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, allocResp *stypes.StoneHubServiceAllocStoneJobResponse) error { - // TBD:: check secondarySPs count by redundancyType. - // EC_TYPE need EC_M + EC_K + backup - // REPLICA_TYPE and INLINE_TYPE need segments count + backup - secondarySPs := mock.AllocUploadSecondarySP() - - // check redundancyType and targetIdx is valid - redundancyType := allocResp.GetPieceJob().GetRedundancyType() - if err := checkRedundancyType(redundancyType); err != nil { - log.CtxErrorw(ctx, "invalid redundancy type", "redundancy type", redundancyType) - node.reportErrToStoneHub(ctx, allocResp, err) - return err - } - targetIdx := allocResp.GetPieceJob().GetTargetIdx() - if len(targetIdx) == 0 { - log.CtxError(ctx, "invalid target idx length") - node.reportErrToStoneHub(ctx, allocResp, merrors.ErrEmptyTargetIdx) - return merrors.ErrEmptyTargetIdx - } - - // 1. load all segments data from primary piece store and do ec or not - pieceData, err := node.loadSegmentsData(ctx, allocResp) - if err != nil { - node.reportErrToStoneHub(ctx, allocResp, err) - return err - } - - // 2. dispatch the piece data to different secondary sp - secondaryPieceData, err := node.dispatchSecondarySP(pieceData, redundancyType, secondarySPs, targetIdx) - if err != nil { - log.CtxErrorw(ctx, "dispatch piece data to secondary sp error") - node.reportErrToStoneHub(ctx, allocResp, err) - return err - } - - // 3. send piece data to the secondary - node.doSyncToSecondarySP(ctx, allocResp, secondaryPieceData) - return nil -} - -func checkRedundancyType(redundancyType ptypes.RedundancyType) error { - switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: - return nil - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - return nil - default: - return merrors.ErrRedundancyType - } -} - -// loadSegmentsData load segment data from primary storage provider. -// returned map key is segmentKey, value is corresponding ec data from ec1 to ec6, or segment data -func (node *StoneNodeService) loadSegmentsData(ctx context.Context, allocResp *stypes.StoneHubServiceAllocStoneJobResponse) ( - map[string][][]byte, error) { - type segment struct { - objectID uint64 - pieceKey string - segmentData []byte - pieceData [][]byte - pieceErr error - redundancyType ptypes.RedundancyType - } - var ( - doneSegments int64 - loadSegmentErr error - segmentCh = make(chan *segment) - interruptCh = make(chan struct{}) - pieces = make(map[string][][]byte) - objectID = allocResp.GetPieceJob().GetObjectId() - payloadSize = allocResp.GetPieceJob().GetPayloadSize() - redundancyType = allocResp.GetPieceJob().GetRedundancyType() - segmentCount = util.ComputeSegmentCount(payloadSize) - ) - - loadFunc := func(ctx context.Context, seg *segment) error { - select { - case <-interruptCh: - break - default: - data, err := node.store.GetPiece(ctx, seg.pieceKey, 0, 0) - if err != nil { - log.CtxErrorw(ctx, "gets segment data from piece store failed", "error", err, "piece key", - seg.pieceKey) - return err - } - seg.segmentData = data - } - return nil - } - - spiltFunc := func(ctx context.Context, seg *segment) error { - select { - case <-interruptCh: - break - default: - pieceData, err := node.generatePieceData(redundancyType, seg.segmentData) - if err != nil { - log.CtxErrorw(ctx, "ec encode failed", "error", err, "piece key", seg.pieceKey) - return err - } - seg.pieceData = pieceData - } - return nil - } - - for i := 0; i < int(segmentCount); i++ { - go func(segmentIdx int) { - seg := &segment{ - objectID: objectID, - pieceKey: piecestore.EncodeSegmentPieceKey(objectID, uint32(segmentIdx)), - redundancyType: redundancyType, - } - defer func() { - if seg.pieceErr != nil || atomic.AddInt64(&doneSegments, 1) == int64(segmentCount) { - close(interruptCh) - close(segmentCh) - } - }() - if loadSegmentErr = loadFunc(ctx, seg); loadSegmentErr != nil { - return - } - if loadSegmentErr = spiltFunc(ctx, seg); loadSegmentErr != nil { - return - } - select { - case <-interruptCh: - return - default: - segmentCh <- seg - } - }(i) - } - - var mu sync.Mutex - for seg := range segmentCh { - mu.Lock() - pieces[seg.pieceKey] = seg.pieceData - mu.Unlock() - } - return pieces, loadSegmentErr -} - -// generatePieceData generates piece data from segment data -func (node *StoneNodeService) generatePieceData(redundancyType ptypes.RedundancyType, segmentData []byte) ( - pieceData [][]byte, err error) { - switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - pieceData = append(pieceData, segmentData) - default: // ec type - pieceData, err = redundancy.EncodeRawSegment(segmentData) - if err != nil { - return nil, err - } - } - return pieceData, nil -} - -// dispatchSecondarySP dispatch piece data to secondary storage provider. -// returned map key is spID, value map key is ec piece key or segment key, value map's value is piece data -func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment map[string][][]byte, redundancyType ptypes.RedundancyType, - secondarySPs []string, targetIdx []uint32) (map[string]map[string][]byte, error) { - // pieceDataBySegment key is segment key; if redundancyType is EC, value is [][]byte type, - // a two-dimensional array which contains ec data from ec1 []byte data to ec6 []byte data - // if redundancyType is replica or inline, value is [][]byte type, a two-dimensional array - // which only contains one []byte data - var ( - err error - pieceDataBySecondary map[string]map[string][]byte - ) - switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - pieceDataBySecondary, err = dispatchReplicaOrInlineData(pieceDataBySegment, secondarySPs, targetIdx) - default: // ec type - pieceDataBySecondary, err = dispatchECData(pieceDataBySegment, secondarySPs, targetIdx) - } - if err != nil { - log.Errorw("fill piece data by secondary error", "error", err) - return nil, err - } - return pieceDataBySecondary, nil -} - -// dispatchReplicaOrInlineData dispatches replica or inline data into different sp, each sp should store all segments data of an object -// if an object uses replica type, it's split into 10 segments and there are 6 sp, each sp should store 10 segments data -// if an object uses inline type, there is only one segment and there are 6 sp, each sp should store 1 segment data -func dispatchReplicaOrInlineData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) ( - map[string]map[string][]byte, error) { - replicaOrInlineDataMap := make(map[string]map[string][]byte) - spNumber := len(secondarySPs) - if spNumber < 1 && spNumber > 6 { - return replicaOrInlineDataMap, merrors.ErrSecondarySPNumber - } - - keys := util.GenericSortedKeys(pieceDataBySegment) - for i := 0; i < len(secondarySPs); i++ { - sp := secondarySPs[i] - for j := 0; j < len(keys); j++ { - pieceKey := keys[j] - pieceData := pieceDataBySegment[pieceKey] - if len(pieceData) != 1 { - return nil, merrors.ErrInvalidSegmentData - } - - for _, index := range targetIdx { - if int(index) == i { - if _, ok := replicaOrInlineDataMap[sp]; !ok { - replicaOrInlineDataMap[sp] = make(map[string][]byte) - } - replicaOrInlineDataMap[sp][pieceKey] = pieceData[0] - } - } - } - } - return replicaOrInlineDataMap, nil -} - -// dispatchECData dispatched ec data into different sp -// one sp stores same ec column data: sp1 stores all ec1 data, sp2 stores all ec2 data, etc -func dispatchECData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) (map[string]map[string][]byte, error) { - ecPieceDataMap := make(map[string]map[string][]byte) - for pieceKey, pieceData := range pieceDataBySegment { - if len(pieceData) != 6 { - return map[string]map[string][]byte{}, merrors.ErrInvalidECData - } - - for idx, data := range pieceData { - if idx >= len(secondarySPs) { - return map[string]map[string][]byte{}, merrors.ErrSecondarySPNumber - } - - sp := secondarySPs[idx] - for _, index := range targetIdx { - if int(index) == idx { - if _, ok := ecPieceDataMap[sp]; !ok { - ecPieceDataMap[sp] = make(map[string][]byte) - } - key := piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, uint32(idx)) - ecPieceDataMap[sp][key] = data - } - } - } - } - return ecPieceDataMap, nil -} - -// doSyncToSecondarySP send piece data to the secondary. -func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *stypes.StoneHubServiceAllocStoneJobResponse, - pieceDataBySecondary map[string]map[string][]byte) error { - var ( - objectID = resp.GetPieceJob().GetObjectId() - payloadSize = resp.GetPieceJob().GetPayloadSize() - redundancyType = resp.GetPieceJob().GetRedundancyType() - ) - for secondary, pieceData := range pieceDataBySecondary { - go func(secondary string, pieceData map[string][]byte) { - errMsg := &stypes.ErrMessage{} - pieceJob := &stypes.PieceJob{ - ObjectId: objectID, - PayloadSize: payloadSize, - RedundancyType: redundancyType, - } - - defer func() { - // notify stone hub when an ec segment is done - req := &stypes.StoneHubServiceDoneSecondaryPieceJobRequest{ - TraceId: resp.GetTraceId(), - PieceJob: pieceJob, - ErrMessage: errMsg, - } - // TBD:: according to the secondary to pick up the address - if _, err := node.stoneHub.DoneSecondaryPieceJob(ctx, req); err != nil { - log.CtxErrorw(ctx, "done secondary piece job to stone hub failed", "error", err) - return - } - }() - - syncResp, err := node.syncPiece(ctx, &stypes.SyncerInfo{ - ObjectId: objectID, - StorageProviderId: secondary, - PieceCount: uint32(len(pieceData)), - RedundancyType: redundancyType, - }, pieceData, resp.GetTraceId()) - // TBD:: retry alloc secondary sp and rat again. - if err != nil { - log.CtxErrorw(ctx, "sync to secondary piece job failed", "error", err) - errMsg.ErrCode = stypes.ErrCode_ERR_CODE_ERROR - errMsg.ErrMsg = err.Error() - return - } - - var pieceHash [][]byte - keys := util.GenericSortedKeys(pieceData) - for _, key := range keys { - pieceHash = append(pieceHash, hash.GenerateChecksum(pieceData[key])) - } - integrityHash := hash.GenerateIntegrityHash(pieceHash) - if syncResp.GetSecondarySpInfo() == nil || syncResp.GetSecondarySpInfo().GetIntegrityHash() == nil || - !bytes.Equal(integrityHash, syncResp.GetSecondarySpInfo().GetIntegrityHash()) { - log.CtxErrorw(ctx, "secondary integrity hash check error") - errMsg.ErrCode = stypes.ErrCode_ERR_CODE_ERROR - errMsg.ErrMsg = merrors.ErrIntegrityHash.Error() - return - } - pieceJob.StorageProviderSealInfo = syncResp.GetSecondarySpInfo() - log.CtxDebugw(ctx, "sync piece data to secondary", "secondary_provider", secondary, - "local_integrity_hash", integrityHash, "remote_integrity_hash", syncResp.GetSecondarySpInfo().GetIntegrityHash()) - }(secondary, pieceData) - } - return nil -} - -// reportErrToStoneHub send error message to stone hub. -func (node *StoneNodeService) reportErrToStoneHub(ctx context.Context, resp *stypes.StoneHubServiceAllocStoneJobResponse, - reportErr error) { - if reportErr == nil { - return - } - req := &stypes.StoneHubServiceDoneSecondaryPieceJobRequest{ - TraceId: resp.GetTraceId(), - ErrMessage: &stypes.ErrMessage{ - ErrCode: stypes.ErrCode_ERR_CODE_ERROR, - ErrMsg: reportErr.Error(), - }, - } - if _, err := node.stoneHub.DoneSecondaryPieceJob(ctx, req); err != nil { - log.CtxErrorw(ctx, "report stone hub err msg failed", "error", err) - return - } - log.CtxInfow(ctx, "report stone hub err msg success") -} - -// syncPiece send rpc request to secondary storage provider to sync the piece data. -func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *stypes.SyncerInfo, - pieceData map[string][]byte, traceID string) (*stypes.SyncerServiceSyncPieceResponse, error) { - stream, err := node.syncer.SyncPiece(ctx) - if err != nil { - log.Errorw("sync secondary piece job error", "err", err) - return nil, err - } - - // send data one by one to avoid exceeding rpc max msg size - for key, value := range pieceData { - innerMap := make(map[string][]byte) - innerMap[key] = value - if err := stream.Send(&stypes.SyncerServiceSyncPieceRequest{ - TraceId: traceID, - SyncerInfo: syncerInfo, - PieceData: innerMap, - }); err != nil { - log.Errorw("client send request error", "error", err) - return nil, err - } - } - - resp, err := stream.CloseAndRecv() - if err != nil { - log.Errorw("client close error", "error", err, "traceID", resp.GetTraceId()) - return nil, err - } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypes.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { - log.Errorw("sync piece sends to stone node response code is not success", "error", err, "traceID", resp.GetTraceId()) - return nil, errors.New(resp.GetErrMessage().GetErrMsg()) - } - return resp, nil -} diff --git a/service/stonenode/sync_piece.go b/service/stonenode/sync_piece.go new file mode 100644 index 000000000..cf0f19d4c --- /dev/null +++ b/service/stonenode/sync_piece.go @@ -0,0 +1,128 @@ +package stonenode + +import ( + "bytes" + "context" + "errors" + + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/util/hash" + "github.com/bnb-chain/greenfield-storage-provider/util/log" +) + +// doSyncToSecondarySP send piece data to the secondary +func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *stypes.StoneHubServiceAllocStoneJobResponse, + pieceDataBySecondary [][][]byte, secondarySPs []string) error { + var ( + objectID = resp.GetPieceJob().GetObjectId() + payloadSize = resp.GetPieceJob().GetPayloadSize() + redundancyType = resp.GetPieceJob().GetRedundancyType() + ) + // index represents which number ec in ec type, corresponding pieceData contains ec data + // index represents which number sp in replica or inline type, it stores same two-dimensional slice + // the length of pieceData represents the number of segments, SyncPiece is client stream interface, it sends + // []byte data one by one, so it can be used to compute syncer server receives correct piece data + for index, pieceData := range pieceDataBySecondary { + go func(index int, pieceData [][]byte) { + errMsg := &stypes.ErrMessage{} + pieceJob := &stypes.PieceJob{ + ObjectId: objectID, + PayloadSize: payloadSize, + RedundancyType: redundancyType, + } + + defer func() { + // notify stone hub when an ec segment is done + req := &stypes.StoneHubServiceDoneSecondaryPieceJobRequest{ + TraceId: resp.GetTraceId(), + PieceJob: pieceJob, + ErrMessage: errMsg, + } + // TBD:: according to the secondary to pick up the address + if _, err := node.stoneHub.DoneSecondaryPieceJob(ctx, req); err != nil { + log.CtxErrorw(ctx, "done secondary piece job to stone hub failed", "error", err) + return + } + }() + + syncResp, err := node.syncPiece(ctx, &stypes.SyncerInfo{ + ObjectId: objectID, + StorageProviderId: secondarySPs[index], + PieceIndex: uint32(index), + PieceCount: uint32(len(pieceData)), + RedundancyType: redundancyType, + }, pieceData, resp.GetTraceId()) + // TBD:: retry alloc secondary sp and rat again. + if err != nil { + log.CtxErrorw(ctx, "sync to secondary piece job failed", "error", err) + errMsg.ErrCode = stypes.ErrCode_ERR_CODE_ERROR + errMsg.ErrMsg = err.Error() + return + } + + spInfo := syncResp.GetSecondarySpInfo() + if ok := verifyIntegrityHash(pieceData, spInfo); !ok { + log.CtxErrorw(ctx, "wrong secondary integrity hash", "error", err) + errMsg.ErrCode = stypes.ErrCode_ERR_CODE_ERROR + errMsg.ErrMsg = merrors.ErrIntegrityHash.Error() + return + } + log.Debug("verify secondary integrity hash successfully") + + pieceJob.StorageProviderSealInfo = spInfo + log.CtxDebugw(ctx, "sync piece data to secondary", "secondary_provider", secondarySPs[index]) + }(index, pieceData) + } + log.Info("secondary piece job done") + return nil +} + +// verifyIntegrityHash verify secondary integrity hash is equal to local's +func verifyIntegrityHash(pieceData [][]byte, spInfo *stypes.StorageProviderSealInfo) bool { + pieceHash := make([][]byte, 0) + for _, value := range pieceData { + pieceHash = append(pieceHash, hash.GenerateChecksum(value)) + } + integrityHash := hash.GenerateIntegrityHash(pieceHash) + if spInfo == nil || spInfo.GetIntegrityHash() == nil || !bytes.Equal(integrityHash, spInfo.GetIntegrityHash()) { + log.Error("wrong secondary integrity hash") + return false + } + log.Debugw("verify integrity hash", "local_integrity_hash", integrityHash, + "remote_integrity_hash", spInfo.GetIntegrityHash()) + return true +} + +// syncPiece send rpc request to secondary storage provider to sync the piece data +func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *stypes.SyncerInfo, + pieceData [][]byte, traceID string) (*stypes.SyncerServiceSyncPieceResponse, error) { + stream, err := node.syncer.SyncPiece(ctx) + if err != nil { + log.Errorw("sync secondary piece job error", "err", err) + return nil, err + } + + // send data one by one to avoid exceeding rpc max msg size + for _, value := range pieceData { + if err := stream.Send(&stypes.SyncerServiceSyncPieceRequest{ + TraceId: traceID, + SyncerInfo: syncerInfo, + PieceData: value, + }); err != nil { + log.Errorw("client send request error", "error", err) + return nil, err + } + } + + resp, err := stream.CloseAndRecv() + if err != nil { + log.Errorw("client close error", "error", err, "traceID", resp.GetTraceId()) + return nil, err + } + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypes.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + log.Errorw("sync piece sends to stone node response code is not success", "error", err, "traceID", resp.GetTraceId()) + return nil, errors.New(resp.GetErrMessage().GetErrMsg()) + } + return resp, nil +} diff --git a/service/stonenode/sync_piece_test.go b/service/stonenode/sync_piece_test.go new file mode 100644 index 000000000..0a8beacaf --- /dev/null +++ b/service/stonenode/sync_piece_test.go @@ -0,0 +1,92 @@ +package stonenode + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + + spmock "github.com/bnb-chain/greenfield-storage-provider/mock" + ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/service/client/mock" + stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" +) + +// TODO:need improved +func Test_doSyncToSecondarySP(t *testing.T) { + cases := []struct { + name string + req1 *stypes.StoneHubServiceAllocStoneJobResponse + req2 [][][]byte + }{ + { + name: "1", + req1: nil, + req2: dispatchECPiece(), + }, + } + + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // stoneHub service stub + //stoneHub := mock.NewMockStoneHubAPI(ctrl) + //node.stoneHub = stoneHub + //stoneHub.EXPECT().DoneSecondaryPieceJob(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + // func(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) ( + // *service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { + // return nil, nil + // }) + + // syncer service stub + streamClient := makeStreamMock() + syncer := mock.NewMockSyncerAPI(ctrl) + node.syncer = syncer + syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + allocResp := mockAllocResp(123456, 20*1024*1024, ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) + err := node.doSyncToSecondarySP(context.TODO(), allocResp, tt.req2, spmock.AllocUploadSecondarySP()) + assert.Equal(t, nil, err) + }) + } +} + +func TestSyncPieceSuccess(t *testing.T) { + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + streamClient := makeStreamMock() + syncer := mock.NewMockSyncerAPI(ctrl) + node.syncer = syncer + syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + sInfo := &stypes.SyncerInfo{ + ObjectId: 123456, + StorageProviderId: "440246a94fc4257096b8d4fa8db94a5655f455f88555f885b10da1466763f742", + RedundancyType: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + } + data := [][]byte{ + []byte("test1"), + []byte("test2"), + []byte("test3"), + []byte("test4"), + []byte("test5"), + []byte("test6"), + } + resp, err := node.syncPiece(context.TODO(), sInfo, data, "test_traceID") + assert.Equal(t, err, nil) + assert.Equal(t, resp.GetTraceId(), "test_traceID") + assert.Equal(t, resp.GetSecondarySpInfo().GetPieceIdx(), uint32(1)) +} diff --git a/service/stonenode/sync_test.go b/service/stonenode/sync_test.go deleted file mode 100644 index 3351648ad..000000000 --- a/service/stonenode/sync_test.go +++ /dev/null @@ -1,349 +0,0 @@ -package stonenode - -import ( - "context" - "errors" - "testing" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - - "github.com/bnb-chain/greenfield-storage-provider/model" - merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - "github.com/bnb-chain/greenfield-storage-provider/service/client/mock" - stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" -) - -func TestInitClientFailed(t *testing.T) { - node := &StoneNodeService{ - name: model.StoneNodeService, - stoneLimit: 0, - } - node.running.Store(true) - err := node.initClient() - assert.Equal(t, merrors.ErrStoneNodeStarted, err) -} - -func Test_loadSegmentsDataSuccess(t *testing.T) { - cases := []struct { - name string - req1 uint64 - req2 uint64 - req3 ptypes.RedundancyType - wantedResult1 string - wantedResult2 int - wantedErr error - }{ - { - name: "ec type: payload size greater than 16MB", - req1: 20230109001, - req2: 20 * 1024 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - wantedResult1: "20230109001", - wantedResult2: 2, - wantedErr: nil, - }, - { - name: "ec type: payload size less than 16MB and greater than 1MB", - req1: 20230109002, - req2: 15 * 1024 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - wantedResult1: "20230109002", - wantedResult2: 1, - wantedErr: nil, - }, - { - name: "replica type: payload size greater than 16MB", - req1: 20230109003, - req2: 20 * 1024 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, - wantedResult1: "20230109003", - wantedResult2: 2, - wantedErr: nil, - }, - { - name: "replica type: payload size less than 16MB and greater than 1MB", - req1: 20230109004, - req2: 15 * 1024 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, - wantedResult1: "20230109004", - wantedResult2: 1, - wantedErr: nil, - }, - { - name: "inline type: payload size less than 1MB", - req1: 20230109005, - req2: 1000 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, - wantedResult1: "20230109005", - wantedResult2: 1, - wantedErr: nil, - }, - } - - node := setup(t) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - ps := mock.NewMockPieceStoreAPI(ctrl) - node.store = ps - ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { - return []byte("1"), nil - }).AnyTimes() - - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - allocResp := mockAllocResp(tt.req1, tt.req2, tt.req3) - result, err := node.loadSegmentsData(context.TODO(), allocResp) - assert.Equal(t, nil, err) - for k := range result { - assert.Contains(t, k, tt.wantedResult1) - } - assert.Equal(t, tt.wantedResult2, len(result)) - }) - } -} - -func Test_loadSegmentsDataPieceStoreError(t *testing.T) { - node := setup(t) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - ps := mock.NewMockPieceStoreAPI(ctrl) - node.store = ps - ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { - return nil, errors.New("piece store s3 network error") - }).AnyTimes() - - result, err := node.loadSegmentsData(context.TODO(), mockAllocResp(20230109001, 20*1024*1024, - ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)) - assert.Equal(t, errors.New("piece store s3 network error"), err) - assert.Equal(t, 0, len(result)) -} - -func Test_generatePieceData(t *testing.T) { - cases := []struct { - name string - req1 ptypes.RedundancyType - req2 []byte - wantedResult int - wantedErr error - }{ - { - name: "ec type", - req1: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - req2: []byte("1"), - wantedResult: 6, - wantedErr: nil, - }, - { - name: "replica type", - req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, - req2: []byte("1"), - wantedResult: 1, - wantedErr: nil, - }, - { - name: "inline type", - req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, - req2: []byte("1"), - wantedResult: 1, - wantedErr: nil, - }, - } - - node := setup(t) - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - result, err := node.generatePieceData(tt.req1, tt.req2) - assert.Equal(t, err, tt.wantedErr) - assert.Equal(t, len(result), tt.wantedResult) - }) - } -} - -func Test_dispatchSecondarySP(t *testing.T) { - spList := []string{"sp1", "sp2", "sp3", "sp4", "sp5", "sp6"} - cases := []struct { - name string - req1 map[string][][]byte - req2 ptypes.RedundancyType - req3 []string - req4 []uint32 - wantedResult int - wantedErr error - }{ - { - name: "ec type dispatch", - req1: dispatchPieceMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - req3: spList, - req4: []uint32{0, 1, 2, 3, 4, 5}, - wantedResult: 6, - wantedErr: nil, - }, - { - name: "replica type dispatch", - req1: dispatchSegmentMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, - req3: spList, - req4: []uint32{0, 1, 2}, - wantedResult: 3, - wantedErr: nil, - }, - { - name: "inline type dispatch", - req1: dispatchInlineMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, - req3: spList, - req4: []uint32{0}, - wantedResult: 1, - wantedErr: nil, - }, - { - name: "ec type data retransmission", - req1: dispatchPieceMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - req3: spList, - req4: []uint32{2, 3}, - wantedResult: 2, - wantedErr: nil, - }, - { - name: "replica type data retransmission", - req1: dispatchSegmentMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, - req3: spList, - req4: []uint32{1, 2}, - wantedResult: 2, - wantedErr: nil, - }, - { - name: "wrong secondary sp number", - req1: dispatchPieceMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - req3: []string{}, - req4: []uint32{0, 1, 2, 3, 4, 5}, - wantedResult: 0, - wantedErr: merrors.ErrSecondarySPNumber, - }, - { - name: "wrong ec segment data length", - req1: dispatchSegmentMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - req3: spList, - req4: []uint32{0, 1, 2, 3, 4, 5}, - wantedResult: 0, - wantedErr: merrors.ErrInvalidECData, - }, - { - name: "wrong replica/inline segment data length", - req1: dispatchPieceMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, - req3: spList, - req4: []uint32{0, 1, 2, 3, 4, 5}, - wantedResult: 0, - wantedErr: merrors.ErrInvalidSegmentData, - }, - } - - node := setup(t) - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - result, err := node.dispatchSecondarySP(tt.req1, tt.req2, tt.req3, tt.req4) - assert.Equal(t, tt.wantedErr, err) - assert.Equal(t, tt.wantedResult, len(result)) - }) - } -} - -// TODO:need improved -func Test_doSyncToSecondarySP(t *testing.T) { - data := map[string]map[string][]byte{ - "sp1": { - "123456_s0_p0": []byte("test1"), - "123456_s1_p0": []byte("test2"), - "123456_s2_p0": []byte("test3"), - "123456_s3_p0": []byte("test4"), - "123456_s4_p0": []byte("test5"), - "123456_s5_p0": []byte("test6"), - }, - } - cases := []struct { - name string - req1 *stypes.StoneHubServiceAllocStoneJobResponse - req2 map[string]map[string][]byte - }{ - { - name: "1", - req1: nil, - req2: data, - }, - } - - node := setup(t) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - // stoneHub service stub - //stoneHub := mock.NewMockStoneHubAPI(ctrl) - //node.stoneHub = stoneHub - //stoneHub.EXPECT().DoneSecondaryPieceJob(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - // func(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) ( - // *service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { - // return nil, nil - // }) - - // syncer service stub - streamClient := makeStreamMock() - syncer := mock.NewMockSyncerAPI(ctrl) - node.syncer = syncer - syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { - return streamClient, nil - }).AnyTimes() - - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - allocResp := mockAllocResp(123456, 20*1024*1024, ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) - err := node.doSyncToSecondarySP(context.TODO(), allocResp, tt.req2) - assert.Equal(t, nil, err) - }) - } -} - -func TestSyncPieceSuccess(t *testing.T) { - node := setup(t) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - streamClient := makeStreamMock() - syncer := mock.NewMockSyncerAPI(ctrl) - node.syncer = syncer - syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { - return streamClient, nil - }).AnyTimes() - - sInfo := &stypes.SyncerInfo{ - ObjectId: 123456, - StorageProviderId: "440246a94fc4257096b8d4fa8db94a5655f455f88555f885b10da1466763f742", - RedundancyType: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - } - data := map[string][]byte{ - "123456_s0_p0": []byte("test1"), - "123456_s1_p0": []byte("test2"), - "123456_s2_p0": []byte("test3"), - "123456_s3_p0": []byte("test4"), - "123456_s4_p0": []byte("test5"), - "123456_s5_p0": []byte("test6"), - } - resp, err := node.syncPiece(context.TODO(), sInfo, data, "test_traceID") - assert.Equal(t, err, nil) - assert.Equal(t, resp.GetTraceId(), "test_traceID") - assert.Equal(t, resp.GetSecondarySpInfo().GetPieceIdx(), uint32(1)) -} diff --git a/service/syncer/syncer_service.go b/service/syncer/syncer_service.go index dbaa46706..a2b2f2cb4 100644 --- a/service/syncer/syncer_service.go +++ b/service/syncer/syncer_service.go @@ -2,7 +2,6 @@ package syncer import ( "context" - "errors" "io" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" @@ -18,21 +17,9 @@ import ( func (s *Syncer) SyncPiece(stream stypes.SyncerService_SyncPieceServer) error { var count uint32 var integrityMeta *metadb.IntegrityMeta - var key string var spID string var value []byte - pieceHash := make(map[string][]byte) - //defer func() { - // if err != nil && err != io.EOF { - // log.Info("entry defer func") - // err = stream.SendAndClose(&service.SyncerServiceSyncPieceResponse{ - // ErrMessage: &service.ErrMessage{ - // ErrCode: service.ErrCode_ERR_CODE_ERROR, - // ErrMsg: err.Error(), - // }, - // }) - // } - //}() + pieceHash := make([][]byte, 0) for { req, err := stream.Recv() @@ -42,10 +29,11 @@ func (s *Syncer) SyncPiece(stream stypes.SyncerService_SyncPieceServer) error { return merrors.ErrReceivedPieceCount } - //integrityMeta.PieceHash = pieceHash + integrityMeta.PieceHash = pieceHash sealInfo := generateSealInfo(spID, integrityMeta) integrityMeta.IntegrityHash = sealInfo.GetIntegrityHash() - if err := s.setIntegrityMeta(s.metaDB, integrityMeta); err != nil { + if err := s.metaDB.SetIntegrityMeta(integrityMeta); err != nil { + log.Errorw("set integrity meta error", "error", err) return err } resp := &stypes.SyncerServiceSyncPieceResponse{ @@ -64,90 +52,61 @@ func (s *Syncer) SyncPiece(stream stypes.SyncerService_SyncPieceServer) error { log.Errorw("stream recv failed", "error", err) return err } + count++ spID = req.GetSyncerInfo().GetStorageProviderId() - integrityMeta, key, value, err = s.handlePieceData(req) + integrityMeta, value, err = s.handlePieceData(req, count) if err != nil { return err } - pieceHash[key] = hash.GenerateChecksum(value) - count++ - } -} - -func (s *Syncer) setIntegrityMeta(db metadb.MetaDB, meta *metadb.IntegrityMeta) error { - if err := db.SetIntegrityMeta(meta); err != nil { - log.Errorw("set integrity meta error", "error", err) - return err + pieceHash = append(pieceHash, hash.GenerateChecksum(value)) } - return nil } func generateSealInfo(spID string, integrityMeta *metadb.IntegrityMeta) *stypes.StorageProviderSealInfo { - //keys := util.GenericSortedKeys(integrityMeta.PieceHash) - pieceChecksumList := make([][]byte, 0) - // var integrityHash []byte - //for _, key := range keys { - // value := integrityMeta.PieceHash[key] - // pieceChecksumList = append(pieceChecksumList, value) - //} - integrityHash := hash.GenerateIntegrityHash(pieceChecksumList) + pieceHash := integrityMeta.PieceHash + integrityHash := hash.GenerateIntegrityHash(pieceHash) resp := &stypes.StorageProviderSealInfo{ StorageProviderId: spID, PieceIdx: integrityMeta.PieceIdx, - PieceChecksum: pieceChecksumList, + PieceChecksum: pieceHash, IntegrityHash: integrityHash, Signature: nil, // TODO(mock) } return resp } -func (s *Syncer) handlePieceData(req *stypes.SyncerServiceSyncPieceRequest) (*metadb.IntegrityMeta, string, []byte, error) { - if len(req.GetPieceData()) != 1 { - return nil, "", nil, errors.New("the length of piece data map is not equal to 1") - } - +func (s *Syncer) handlePieceData(req *stypes.SyncerServiceSyncPieceRequest, count uint32) (*metadb.IntegrityMeta, []byte, error) { redundancyType := req.GetSyncerInfo().GetRedundancyType() + objectID := req.GetSyncerInfo().GetObjectId() integrityMeta := &metadb.IntegrityMeta{ - ObjectID: req.GetSyncerInfo().GetObjectId(), + ObjectID: objectID, PieceCount: req.GetSyncerInfo().GetPieceCount(), IsPrimary: false, RedundancyType: redundancyType, } + key, pieceIndex, err := encodePieceKey(redundancyType, objectID, count, req.GetSyncerInfo().GetPieceIndex()) + if err != nil { + return nil, nil, err + } + integrityMeta.PieceIdx = pieceIndex - var ( - key string - value []byte - ) - for key, value = range req.GetPieceData() { - pieceIndex, err := parsePieceIndex(redundancyType, key) - if err != nil { - return nil, "", nil, err - } - integrityMeta.PieceIdx = pieceIndex - - // put piece data into piece store - if err = s.store.PutPiece(key, value); err != nil { - log.Errorw("put piece failed", "error", err) - return nil, "", nil, err - } + // put piece data into piece store + value := req.GetPieceData() + if err = s.store.PutPiece(key, value); err != nil { + log.Errorw("put piece failed", "error", err) + return nil, nil, err } - return integrityMeta, key, value, nil + return integrityMeta, value, nil } -func parsePieceIndex(redundancyType ptypes.RedundancyType, key string) (uint32, error) { - var ( - err error - pieceIndex uint32 - ) +func encodePieceKey(redundancyType ptypes.RedundancyType, objectID uint64, segmentIndex, pieceIndex uint32) ( + string, uint32, error) { switch redundancyType { case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - _, pieceIndex, err = piecestore.DecodeSegmentPieceKey(key) - default: // ec type - _, _, pieceIndex, err = piecestore.DecodeECPieceKey(key) - } - if err != nil { - log.Errorw("decode piece key failed", "error", err) - return 0, err + return piecestore.EncodeSegmentPieceKey(objectID, segmentIndex), pieceIndex, nil + case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: + return piecestore.EncodeECPieceKey(objectID, segmentIndex, pieceIndex), pieceIndex, nil + default: + return "", 0, merrors.ErrRedundancyType } - return pieceIndex, nil } diff --git a/util/maps/map.go b/util/maps/map.go new file mode 100644 index 000000000..53e63b9a2 --- /dev/null +++ b/util/maps/map.go @@ -0,0 +1,34 @@ +package maps + +import ( + "sort" + + "golang.org/x/exp/constraints" +) + +// SortKeys sort keys of a map +func SortKeys[M ~map[K]V, K constraints.Ordered, V any](m M) []K { + keys := make([]K, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sortSlice(keys) + return keys +} + +func sortSlice[T constraints.Ordered](s []T) { + sort.Slice(s, func(i, j int) bool { + return s[i] < s[j] + }) +} + +// ValueToSlice convert values of a map to a slice +func ValueToSlice[M ~map[K]V, K constraints.Ordered, V any](m M) []V { + keys := SortKeys(m) + valueSlice := make([]V, 0) + for _, key := range keys { + value := m[key] + valueSlice = append(valueSlice, value) + } + return valueSlice +} diff --git a/util/utils.go b/util/utils.go index 5567d25b2..bf581555d 100644 --- a/util/utils.go +++ b/util/utils.go @@ -5,15 +5,14 @@ import ( "math/rand" "os" "reflect" - "sort" "strconv" "strings" "unicode" - "github.com/naoina/toml" - "golang.org/x/exp/constraints" - "github.com/bnb-chain/greenfield-storage-provider/model" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + "github.com/naoina/toml" ) // TomlSettings - These settings ensure that TOML keys use the same names as Go struct fields. @@ -48,22 +47,6 @@ func ComputeSegmentCount(size uint64) uint32 { return segmentCount } -// GenericSortedKeys sort keys of a map -func GenericSortedKeys[K constraints.Ordered, V any](dataMap map[K]V) []K { - keys := make([]K, 0, len(dataMap)) - for k := range dataMap { - keys = append(keys, k) - } - sortSlice(keys) - return keys -} - -func sortSlice[T constraints.Ordered](s []T) { - sort.Slice(s, func(i, j int) bool { - return s[i] < s[j] - }) -} - // JobStateReadable parser the job state to readable func JobStateReadable(state string) string { return strings.ToLower(strings.TrimPrefix(state, "JOB_STATE_")) @@ -73,3 +56,15 @@ func JobStateReadable(state string) string { func SpReadable(provider string) string { return provider[:8] } + +// ValidateRedundancyType validate redundancy type +func ValidateRedundancyType(redundancyType ptypes.RedundancyType) error { + switch redundancyType { + case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: + return nil + case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + return nil + default: + return merrors.ErrRedundancyType + } +}