Skip to content

Commit

Permalink
fix: use multi-dimensional array to send piece data and piece hash (#69)
Browse files Browse the repository at this point in the history
* fix: use multi-dimensional array to send piece data and piece hash

---------

Co-authored-by: DylanYong <dylan.y@nodereal.io>
  • Loading branch information
2 people authored and will-2012 committed Feb 10, 2023
1 parent 4de5c3d commit e3fd55a
Show file tree
Hide file tree
Showing 23 changed files with 904 additions and 897 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/proto-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ jobs:
with:
input: 'proto'

- uses: bufbuild/buf-breaking-action@v1
with:
against: "https//github.com/bnb-chain/greenfield-storage-provider"

# proto-lint-success:
# needs: proto-lint
# if: ${{ success() }}
Expand Down
2 changes: 1 addition & 1 deletion model/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// RPC config
const (
// server and client max send or recv msg size
MaxCallMsgSize = 40 * 1024 * 1024
MaxCallMsgSize = 25 * 1024 * 1024
)

// Gateway
Expand Down
3 changes: 2 additions & 1 deletion model/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ var (
var (
ErrStoneNodeStarted = errors.New("stone node resource is running")
ErrStoneNodeStopped = errors.New("stone node service has stopped")
ErrIntegrityHash = errors.New("secondary integrity hash check error")
ErrIntegrityHash = errors.New("secondary integrity hash verifies error")
ErrRedundancyType = errors.New("unknown redundancy type")
ErrEmptyJob = errors.New("alloc stone job is empty")
ErrSecondarySPNumber = errors.New("secondary sp is not enough")
ErrInvalidPieceData = errors.New("invalid piece data")
ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1")
ErrInvalidECData = errors.New("invalid ec data, length is not equal to 6")
ErrEmptyTargetIdx = errors.New("target index array is empty")
Expand Down
14 changes: 7 additions & 7 deletions model/piecestore/piece_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ func EncodeSegmentPieceKey(objectID uint64, segmentIndex uint32) string {
func DecodeSegmentPieceKey(pieceKey string) (uint64, uint32, error) {
keys := strings.Split(pieceKey, "_")
if valid := CheckSegmentPieceKey(keys); !valid {
log.Errorw("Invalid segment piece key", "piece key", pieceKey)
return 0, 0, fmt.Errorf("Invalid segment piece key")
log.Errorw("segment piece key is wrong", "segment piece key", pieceKey)
return 0, 0, fmt.Errorf("invalid segment piece key")
}

objectID, _ := strconv.ParseUint(keys[0], 10, 64)
s := numberRegex.FindString(keys[1])
segmentIndex, _ := (strconv.ParseUint(s, 10, 32))
segmentIndex, _ := strconv.ParseUint(s, 10, 32)

return objectID, uint32(segmentIndex), nil
}
Expand Down Expand Up @@ -64,8 +64,8 @@ func EncodeECPieceKeyBySegmentKey(segmentKey string, pieceIndex uint32) string {
func DecodeECPieceKey(pieceKey string) (uint64, uint32, uint32, error) {
keys := strings.Split(pieceKey, "_")
if valid := CheckECPieceKey(keys); !valid {
log.Errorw("Invalid EC piece key", "piece key", pieceKey)
return 0, 0, 0, fmt.Errorf("Invalid EC piece key")
log.Errorw("ec piece key is wrong", "ec piece key", pieceKey)
return 0, 0, 0, fmt.Errorf("invalid EC piece key")
}

objectID, _ := strconv.ParseUint(keys[0], 10, 64)
Expand All @@ -86,7 +86,7 @@ var (
// CheckSegmentPieceKey checks ec piece key is correct
func CheckSegmentPieceKey(keys []string) bool {
if len(keys) != 2 {
log.Errorw("Invalid segment piece key", "piece key", keys)
log.Errorw("invalid segment piece key", "segment piece key", keys)
return false
}

Expand All @@ -99,7 +99,7 @@ func CheckSegmentPieceKey(keys []string) bool {
// CheckECPieceKey checks EC piece key is correct
func CheckECPieceKey(keys []string) bool {
if len(keys) != 3 {
log.Errorw("Invalid EC piece key", "piece key", keys)
log.Errorw("invalid EC piece key", "ec piece key", keys)
return false
}

Expand Down
20 changes: 10 additions & 10 deletions model/piecestore/piece_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,35 +49,35 @@ func TestDecodeSegmentPieceKey(t *testing.T) {
req: "testID_s2",
wantedResp1: 0,
wantedResp2: 0,
wantedErr: fmt.Errorf("Invalid segment piece key"),
wantedErr: fmt.Errorf("invalid segment piece key"),
},
{
name: "invalid piece key 2",
req: "123456789_p",
wantedResp1: 0,
wantedResp2: 0,
wantedErr: fmt.Errorf("Invalid segment piece key"),
wantedErr: fmt.Errorf("invalid segment piece key"),
},
{
name: "invalid piece key 3",
req: "123456789_s123r",
wantedResp1: 0,
wantedResp2: 0,
wantedErr: fmt.Errorf("Invalid segment piece key"),
wantedErr: fmt.Errorf("invalid segment piece key"),
},
{
name: "invalid piece key 4",
req: "123456789_ss.123",
wantedResp1: 0,
wantedResp2: 0,
wantedErr: fmt.Errorf("Invalid segment piece key"),
wantedErr: fmt.Errorf("invalid segment piece key"),
},
{
name: "invalid segment piece key 4",
req: "123456789_s123/111",
wantedResp1: 0,
wantedResp2: 0,
wantedErr: fmt.Errorf("Invalid segment piece key"),
wantedErr: fmt.Errorf("invalid segment piece key"),
},
}
for _, tt := range cases {
Expand Down Expand Up @@ -137,39 +137,39 @@ func TestDecodeECPieceKey(t *testing.T) {
wantedResp1: 0,
wantedResp2: 0,
wantedResp3: 0,
wantedErr: fmt.Errorf("Invalid EC piece key"),
wantedErr: fmt.Errorf("invalid EC piece key"),
},
{
name: "invalid ec piece key 2",
req: "123456789_s1_p",
wantedResp1: 0,
wantedResp2: 0,
wantedResp3: 0,
wantedErr: fmt.Errorf("Invalid EC piece key"),
wantedErr: fmt.Errorf("invalid EC piece key"),
},
{
name: "invalid ec piece key 3",
req: "123456789_s1_ps2",
wantedResp1: 0,
wantedResp2: 0,
wantedResp3: 0,
wantedErr: fmt.Errorf("Invalid EC piece key"),
wantedErr: fmt.Errorf("invalid EC piece key"),
},
{
name: "invalid ec piece key 4",
req: "123456789_s1_p2_p3",
wantedResp1: 0,
wantedResp2: 0,
wantedResp3: 0,
wantedErr: fmt.Errorf("Invalid EC piece key"),
wantedErr: fmt.Errorf("invalid EC piece key"),
},
{
name: "invalid ec piece key 5",
req: "123456789_s1_p2n",
wantedResp1: 0,
wantedResp2: 0,
wantedResp3: 0,
wantedErr: fmt.Errorf("Invalid EC piece key"),
wantedErr: fmt.Errorf("invalid EC piece key"),
},
}
for _, tt := range cases {
Expand Down
5 changes: 3 additions & 2 deletions proto/service/types/v1/syncer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ message SyncerInfo {
// bytes tx_hash = 2;
string storage_provider_id = 3;
uint32 piece_count = 4;
pkg.types.v1.RedundancyType redundancy_type = 5;
uint32 piece_index = 5;
pkg.types.v1.RedundancyType redundancy_type = 6;
}

message SyncerServiceSyncPieceRequest {
string trace_id = 1;
SyncerInfo syncer_info = 2;
map<string, bytes> piece_data = 3; // key is piece key, value is piece_data
bytes piece_data = 3;
}

message SyncerServiceSyncPieceResponse {
Expand Down
2 changes: 2 additions & 0 deletions service/client/piece_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/bnb-chain/greenfield-storage-provider/util/log"
)

var _ PieceStoreAPI = &StoreClient{}

// PieceStoreAPI provides an interface to enable mocking the
// StoreClient's API operation. This makes unit test to test your code easier.
//
Expand Down
3 changes: 1 addition & 2 deletions service/client/stone_hub_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package client
import (
"context"
"errors"
"io"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -14,7 +13,7 @@ import (
"github.com/bnb-chain/greenfield-storage-provider/util/log"
)

var _ io.Closer = &StoneHubClient{}
var _ StoneHubAPI = &StoneHubClient{}

// StoneHubAPI provides an interface to enable mocking the
// StoneHubClient's API operation. This makes unit test to test your code easier.
Expand Down
3 changes: 1 addition & 2 deletions service/client/syncer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"io"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -12,7 +11,7 @@ import (
"github.com/bnb-chain/greenfield-storage-provider/util/log"
)

var _ io.Closer = &SyncerClient{}
var _ SyncerAPI = &SyncerClient{}

// SyncerAPI provides an interface to enable mocking the
// SyncerClient's API operation. This makes unit test to test your code easier.
Expand Down
93 changes: 93 additions & 0 deletions service/stonenode/alloc_stone_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package stonenode

import (
"context"
"errors"

"github.com/bnb-chain/greenfield-storage-provider/mock"
merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1"
"github.com/bnb-chain/greenfield-storage-provider/util"
"github.com/bnb-chain/greenfield-storage-provider/util/log"
)

// allocStoneJob sends rpc request to stone hub alloc stone job
func (node *StoneNodeService) allocStoneJob(ctx context.Context) {
resp, err := node.stoneHub.AllocStoneJob(ctx)
ctx = log.Context(ctx, resp, resp.GetPieceJob())
if err != nil {
if errors.Is(err, merrors.ErrEmptyJob) {
return
}
log.CtxErrorw(ctx, "alloc stone from stone hub failed", "error", err)
return
}
// TBD:: stone node will support more types of stone job,
// currently only support upload secondary piece job
if err := node.loadAndSyncPieces(ctx, resp); err != nil {
log.CtxErrorw(ctx, "upload secondary piece job failed", "error", err)
return
}
log.CtxInfow(ctx, "upload secondary piece job success")
}

// loadAndSyncPieces load segment data from primary and sync to secondary
func (node *StoneNodeService) loadAndSyncPieces(ctx context.Context, allocResp *stypes.StoneHubServiceAllocStoneJobResponse) error {
// TBD:: check secondarySPs count by redundancyType.
// EC_TYPE need EC_M + EC_K + backup
// REPLICA_TYPE and INLINE_TYPE need segments count + backup
secondarySPs := mock.AllocUploadSecondarySP()

// validate redundancyType and targetIdx
redundancyType := allocResp.GetPieceJob().GetRedundancyType()
if err := util.ValidateRedundancyType(redundancyType); err != nil {
log.CtxErrorw(ctx, "invalid redundancy type", "redundancy type", redundancyType)
node.reportErrToStoneHub(ctx, allocResp, err)
return err
}
targetIdx := allocResp.GetPieceJob().GetTargetIdx()
if len(targetIdx) == 0 {
log.CtxError(ctx, "invalid target idx length")
node.reportErrToStoneHub(ctx, allocResp, merrors.ErrEmptyTargetIdx)
return merrors.ErrEmptyTargetIdx
}

// 1. load all segments data from primary piece store and encode
pieceData, err := node.encodeSegmentsData(ctx, allocResp)
if err != nil {
node.reportErrToStoneHub(ctx, allocResp, err)
return err
}

// 2. dispatch the piece data to different secondary sp
secondaryPieceData, err := node.dispatchSecondarySP(pieceData, redundancyType, secondarySPs, targetIdx)
if err != nil {
log.CtxErrorw(ctx, "dispatch piece data to secondary sp error")
node.reportErrToStoneHub(ctx, allocResp, err)
return err
}

// 3. send piece data to the secondary
node.doSyncToSecondarySP(ctx, allocResp, secondaryPieceData, secondarySPs)
return nil
}

// reportErrToStoneHub send error message to stone hub
func (node *StoneNodeService) reportErrToStoneHub(ctx context.Context, resp *stypes.StoneHubServiceAllocStoneJobResponse,
reportErr error) {
if reportErr == nil {
return
}
req := &stypes.StoneHubServiceDoneSecondaryPieceJobRequest{
TraceId: resp.GetTraceId(),
ErrMessage: &stypes.ErrMessage{
ErrCode: stypes.ErrCode_ERR_CODE_ERROR,
ErrMsg: reportErr.Error(),
},
}
if _, err := node.stoneHub.DoneSecondaryPieceJob(ctx, req); err != nil {
log.CtxErrorw(ctx, "report stone hub err msg failed", "error", err)
return
}
log.CtxInfow(ctx, "report stone hub err msg success")
}
Loading

0 comments on commit e3fd55a

Please sign in to comment.