diff --git a/model/errors/errors.go b/model/errors/errors.go index 7433c698e..86cdae037 100644 --- a/model/errors/errors.go +++ b/model/errors/errors.go @@ -40,12 +40,13 @@ var ( // stone node service errors var ( - ErrStoneNodeStarted = errors.New("stone node resource is running") - ErrStoneNodeStopped = errors.New("stone node service has stopped") - ErrIntegrityHash = errors.New("secondary integrity hash check error") - ErrRedundancyType = errors.New("unknown redundancy type") - ErrEmptyJob = errors.New("job is empty") - ErrSecondarySPNumber = errors.New("secondary sp is not enough") + ErrStoneNodeStarted = errors.New("stone node resource is running") + ErrStoneNodeStopped = errors.New("stone node service has stopped") + ErrIntegrityHash = errors.New("secondary integrity hash check error") + ErrRedundancyType = errors.New("unknown redundancy type") + ErrEmptyJob = errors.New("job is empty") + ErrSecondarySPNumber = errors.New("secondary sp is not enough") + ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1") ) func MakeErrMsgResponse(err error) *service.ErrMessage { diff --git a/proto/service/types/v1/stone_hub.proto b/proto/service/types/v1/stone_hub.proto index d255c2e5a..ef7a8bad2 100644 --- a/proto/service/types/v1/stone_hub.proto +++ b/proto/service/types/v1/stone_hub.proto @@ -43,7 +43,7 @@ message PieceJob { bytes tx_hash = 3; uint64 object_id = 4; uint64 payload_size = 5; - repeated uint32 target_idx = 6; // ec number: 1, 2, 3... + repeated uint32 target_idx = 6; // ec number: 1, 2, 3..., start at 1; segment number: 0, 1, 2..., start at 0 pkg.types.v1.RedundancyType redundancy_type = 7; StorageProviderSealInfo storage_provider_seal_info = 8; } diff --git a/service/client/mock/piece_store_mock.go b/service/client/mock/piece_store_mock.go new file mode 100644 index 000000000..5394ca66c --- /dev/null +++ b/service/client/mock/piece_store_mock.go @@ -0,0 +1,64 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./piece_store_client.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockPieceStoreAPI is a mock of PieceStoreAPI interface. +type MockPieceStoreAPI struct { + ctrl *gomock.Controller + recorder *MockPieceStoreAPIMockRecorder +} + +// MockPieceStoreAPIMockRecorder is the mock recorder for MockPieceStoreAPI. +type MockPieceStoreAPIMockRecorder struct { + mock *MockPieceStoreAPI +} + +// NewMockPieceStoreAPI creates a new mock instance. +func NewMockPieceStoreAPI(ctrl *gomock.Controller) *MockPieceStoreAPI { + mock := &MockPieceStoreAPI{ctrl: ctrl} + mock.recorder = &MockPieceStoreAPIMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPieceStoreAPI) EXPECT() *MockPieceStoreAPIMockRecorder { + return m.recorder +} + +// GetPiece mocks base method. +func (m *MockPieceStoreAPI) GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPiece", ctx, key, offset, limit) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPiece indicates an expected call of GetPiece. +func (mr *MockPieceStoreAPIMockRecorder) GetPiece(ctx, key, offset, limit interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPiece", reflect.TypeOf((*MockPieceStoreAPI)(nil).GetPiece), ctx, key, offset, limit) +} + +// PutPiece mocks base method. +func (m *MockPieceStoreAPI) PutPiece(key string, value []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutPiece", key, value) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutPiece indicates an expected call of PutPiece. +func (mr *MockPieceStoreAPIMockRecorder) PutPiece(key, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutPiece", reflect.TypeOf((*MockPieceStoreAPI)(nil).PutPiece), key, value) +} diff --git a/service/client/mock/stone_hub_mock.go b/service/client/mock/stone_hub_mock.go new file mode 100644 index 000000000..0898fc801 --- /dev/null +++ b/service/client/mock/stone_hub_mock.go @@ -0,0 +1,171 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./stone_hub_client.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + v1 "github.com/bnb-chain/inscription-storage-provider/service/types/v1" + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockStoneHubAPI is a mock of StoneHubAPI interface. +type MockStoneHubAPI struct { + ctrl *gomock.Controller + recorder *MockStoneHubAPIMockRecorder +} + +// MockStoneHubAPIMockRecorder is the mock recorder for MockStoneHubAPI. +type MockStoneHubAPIMockRecorder struct { + mock *MockStoneHubAPI +} + +// NewMockStoneHubAPI creates a new mock instance. +func NewMockStoneHubAPI(ctrl *gomock.Controller) *MockStoneHubAPI { + mock := &MockStoneHubAPI{ctrl: ctrl} + mock.recorder = &MockStoneHubAPIMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStoneHubAPI) EXPECT() *MockStoneHubAPIMockRecorder { + return m.recorder +} + +// AllocStoneJob mocks base method. +func (m *MockStoneHubAPI) AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) (*v1.StoneHubServiceAllocStoneJobResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "AllocStoneJob", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceAllocStoneJobResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AllocStoneJob indicates an expected call of AllocStoneJob. +func (mr *MockStoneHubAPIMockRecorder) AllocStoneJob(ctx interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllocStoneJob", reflect.TypeOf((*MockStoneHubAPI)(nil).AllocStoneJob), varargs...) +} + +// BeginUploadPayload mocks base method. +func (m *MockStoneHubAPI) BeginUploadPayload(ctx context.Context, in *v1.StoneHubServiceBeginUploadPayloadRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceBeginUploadPayloadResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "BeginUploadPayload", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceBeginUploadPayloadResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BeginUploadPayload indicates an expected call of BeginUploadPayload. +func (mr *MockStoneHubAPIMockRecorder) BeginUploadPayload(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BeginUploadPayload", reflect.TypeOf((*MockStoneHubAPI)(nil).BeginUploadPayload), varargs...) +} + +// Close mocks base method. +func (m *MockStoneHubAPI) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockStoneHubAPIMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStoneHubAPI)(nil).Close)) +} + +// CreateObject mocks base method. +func (m *MockStoneHubAPI) CreateObject(ctx context.Context, in *v1.StoneHubServiceCreateObjectRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceCreateObjectResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CreateObject", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceCreateObjectResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateObject indicates an expected call of CreateObject. +func (mr *MockStoneHubAPIMockRecorder) CreateObject(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateObject", reflect.TypeOf((*MockStoneHubAPI)(nil).CreateObject), varargs...) +} + +// DonePrimaryPieceJob mocks base method. +func (m *MockStoneHubAPI) DonePrimaryPieceJob(ctx context.Context, in *v1.StoneHubServiceDonePrimaryPieceJobRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceDonePrimaryPieceJobResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DonePrimaryPieceJob", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceDonePrimaryPieceJobResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DonePrimaryPieceJob indicates an expected call of DonePrimaryPieceJob. +func (mr *MockStoneHubAPIMockRecorder) DonePrimaryPieceJob(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DonePrimaryPieceJob", reflect.TypeOf((*MockStoneHubAPI)(nil).DonePrimaryPieceJob), varargs...) +} + +// DoneSecondaryPieceJob mocks base method. +func (m *MockStoneHubAPI) DoneSecondaryPieceJob(ctx context.Context, in *v1.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceDoneSecondaryPieceJobResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DoneSecondaryPieceJob", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceDoneSecondaryPieceJobResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DoneSecondaryPieceJob indicates an expected call of DoneSecondaryPieceJob. +func (mr *MockStoneHubAPIMockRecorder) DoneSecondaryPieceJob(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoneSecondaryPieceJob", reflect.TypeOf((*MockStoneHubAPI)(nil).DoneSecondaryPieceJob), varargs...) +} + +// SetObjectCreateInfo mocks base method. +func (m *MockStoneHubAPI) SetObjectCreateInfo(ctx context.Context, in *v1.StoneHubServiceSetObjectCreateInfoRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceSetSetObjectCreateInfoResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SetObjectCreateInfo", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceSetSetObjectCreateInfoResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SetObjectCreateInfo indicates an expected call of SetObjectCreateInfo. +func (mr *MockStoneHubAPIMockRecorder) SetObjectCreateInfo(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetObjectCreateInfo", reflect.TypeOf((*MockStoneHubAPI)(nil).SetObjectCreateInfo), varargs...) +} diff --git a/service/client/mock/syncer_mock.go b/service/client/mock/syncer_mock.go new file mode 100644 index 000000000..1385708d5 --- /dev/null +++ b/service/client/mock/syncer_mock.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./syncer_client.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + v1 "github.com/bnb-chain/inscription-storage-provider/service/types/v1" + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockSyncerAPI is a mock of SyncerAPI interface. +type MockSyncerAPI struct { + ctrl *gomock.Controller + recorder *MockSyncerAPIMockRecorder +} + +// MockSyncerAPIMockRecorder is the mock recorder for MockSyncerAPI. +type MockSyncerAPIMockRecorder struct { + mock *MockSyncerAPI +} + +// NewMockSyncerAPI creates a new mock instance. +func NewMockSyncerAPI(ctrl *gomock.Controller) *MockSyncerAPI { + mock := &MockSyncerAPI{ctrl: ctrl} + mock.recorder = &MockSyncerAPIMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSyncerAPI) EXPECT() *MockSyncerAPIMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockSyncerAPI) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockSyncerAPIMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSyncerAPI)(nil).Close)) +} + +// UploadECPiece mocks base method. +func (m *MockSyncerAPI) UploadECPiece(ctx context.Context, opts ...grpc.CallOption) (v1.SyncerService_UploadECPieceClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UploadECPiece", varargs...) + ret0, _ := ret[0].(v1.SyncerService_UploadECPieceClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UploadECPiece indicates an expected call of UploadECPiece. +func (mr *MockSyncerAPIMockRecorder) UploadECPiece(ctx interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UploadECPiece", reflect.TypeOf((*MockSyncerAPI)(nil).UploadECPiece), varargs...) +} diff --git a/service/client/piece_store_client.go b/service/client/piece_store_client.go index b13a5591b..13b7f1503 100644 --- a/service/client/piece_store_client.go +++ b/service/client/piece_store_client.go @@ -10,6 +10,15 @@ import ( "github.com/bnb-chain/inscription-storage-provider/util/log" ) +// PieceStoreAPI provides an interface to enable mocking the +// StoreClient's API operation. This makes unit test to test your code easier. +// +//go:generate mockgen -source=./piece_store_client.go -destination=./mock/piece_store_mock.go -package=mock +type PieceStoreAPI interface { + GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) + PutPiece(key string, value []byte) error +} + type StoreClient struct { ps *piece.PieceStore } diff --git a/service/client/stone_hub_client.go b/service/client/stone_hub_client.go index fbe342855..31cb23f43 100644 --- a/service/client/stone_hub_client.go +++ b/service/client/stone_hub_client.go @@ -15,10 +15,24 @@ import ( "github.com/bnb-chain/inscription-storage-provider/util/log" ) -var ClientRpcTimeout = time.Second * 5 +var ClientRPCTimeout = time.Second * 5 var _ io.Closer = &StoneHubClient{} +// StoneHubAPI provides an interface to enable mocking the +// StoneHubClient's API operation. This makes unit test to test your code easier. +// +//go:generate mockgen -source=./stone_hub_client.go -destination=./mock/stone_hub_mock.go -package=mock +type StoneHubAPI interface { + CreateObject(ctx context.Context, in *service.StoneHubServiceCreateObjectRequest, opts ...grpc.CallOption) (*service.StoneHubServiceCreateObjectResponse, error) + SetObjectCreateInfo(ctx context.Context, in *service.StoneHubServiceSetObjectCreateInfoRequest, opts ...grpc.CallOption) (*service.StoneHubServiceSetSetObjectCreateInfoResponse, error) + BeginUploadPayload(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadRequest, opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadResponse, error) + DonePrimaryPieceJob(ctx context.Context, in *service.StoneHubServiceDonePrimaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDonePrimaryPieceJobResponse, error) + AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) (*service.StoneHubServiceAllocStoneJobResponse, error) + DoneSecondaryPieceJob(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDoneSecondaryPieceJobResponse, error) + Close() error +} + type StoneHubClient struct { address string stoneHub service.StoneHubServiceClient @@ -26,7 +40,7 @@ type StoneHubClient struct { } func NewStoneHubClient(address string) (*StoneHubClient, error) { - ctx, _ := context.WithTimeout(context.Background(), ClientRpcTimeout) + ctx, _ := context.WithTimeout(context.Background(), ClientRPCTimeout) conn, err := grpc.DialContext(ctx, address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Errorw("invoke stoneHub service grpc.DialContext failed", "error", err) @@ -40,7 +54,8 @@ func NewStoneHubClient(address string) (*StoneHubClient, error) { return client, nil } -func (client *StoneHubClient) CreateObject(ctx context.Context, in *service.StoneHubServiceCreateObjectRequest, opts ...grpc.CallOption) (*service.StoneHubServiceCreateObjectResponse, error) { +func (client *StoneHubClient) CreateObject(ctx context.Context, in *service.StoneHubServiceCreateObjectRequest, + opts ...grpc.CallOption) (*service.StoneHubServiceCreateObjectResponse, error) { resp, err := client.stoneHub.CreateObject(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { @@ -54,7 +69,8 @@ func (client *StoneHubClient) CreateObject(ctx context.Context, in *service.Ston return resp, nil } -func (client *StoneHubClient) SetObjectCreateInfo(ctx context.Context, in *service.StoneHubServiceSetObjectCreateInfoRequest, opts ...grpc.CallOption) (*service.StoneHubServiceSetObjectCreateInfoResponse, error) { +func (client *StoneHubClient) SetObjectCreateInfo(ctx context.Context, in *service.StoneHubServiceSetObjectCreateInfoRequest, + opts ...grpc.CallOption) (*service.StoneHubServiceSetSetObjectCreateInfoResponse, error) { resp, err := client.stoneHub.SetObjectCreateInfo(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { @@ -68,7 +84,8 @@ func (client *StoneHubClient) SetObjectCreateInfo(ctx context.Context, in *servi return resp, nil } -func (client *StoneHubClient) BeginUploadPayload(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadRequest, opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadResponse, error) { +func (client *StoneHubClient) BeginUploadPayload(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadRequest, + opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadResponse, error) { resp, err := client.stoneHub.BeginUploadPayload(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { @@ -110,7 +127,8 @@ func (client *StoneHubClient) DonePrimaryPieceJob(ctx context.Context, in *servi return resp, nil } -func (client *StoneHubClient) AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) (*service.StoneHubServiceAllocStoneJobResponse, error) { +func (client *StoneHubClient) AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) ( + *service.StoneHubServiceAllocStoneJobResponse, error) { req := &service.StoneHubServiceAllocStoneJobRequest{TraceId: util.GenerateRequestID()} resp, err := client.stoneHub.AllocStoneJob(ctx, req, opts...) ctx = log.Context(ctx, resp) @@ -129,7 +147,8 @@ func (client *StoneHubClient) AllocStoneJob(ctx context.Context, opts ...grpc.Ca return resp, nil } -func (client *StoneHubClient) DoneSecondaryPieceJob(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { +func (client *StoneHubClient) DoneSecondaryPieceJob(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, + opts ...grpc.CallOption) (*service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { resp, err := client.stoneHub.DoneSecondaryPieceJob(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { diff --git a/service/client/syncer_client.go b/service/client/syncer_client.go index 437e26d39..bd00258ab 100644 --- a/service/client/syncer_client.go +++ b/service/client/syncer_client.go @@ -14,6 +14,15 @@ import ( var _ io.Closer = &SyncerClient{} +// SyncerAPI provides an interface to enable mocking the +// SyncerClient's API operation. This makes unit test to test your code easier. +// +//go:generate mockgen -source=./syncer_client.go -destination=./mock/syncer_mock.go -package=mock +type SyncerAPI interface { + UploadECPiece(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_UploadECPieceClient, error) + Close() error +} + type SyncerClient struct { address string syncer service.SyncerServiceClient @@ -21,7 +30,7 @@ type SyncerClient struct { } func NewSyncerClient(address string) (*SyncerClient, error) { - ctx, _ := context.WithTimeout(context.Background(), ClientRpcTimeout) + ctx, _ := context.WithTimeout(context.Background(), ClientRPCTimeout) conn, err := grpc.DialContext(ctx, address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Errorw("invoke syncer service grpc.DialContext failed", "error", err) @@ -36,7 +45,8 @@ func NewSyncerClient(address string) (*SyncerClient, error) { } // UploadECPiece return SyncerService_UploadECPieceClient, need to be closed by caller -func (client *SyncerClient) UploadECPiece(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_UploadECPieceClient, error) { +func (client *SyncerClient) UploadECPiece(ctx context.Context, opts ...grpc.CallOption) ( + service.SyncerService_UploadECPieceClient, error) { return client.syncer.UploadECPiece(ctx, opts...) } diff --git a/service/stonenode/helper_test.go b/service/stonenode/helper_test.go new file mode 100644 index 000000000..1692d48f9 --- /dev/null +++ b/service/stonenode/helper_test.go @@ -0,0 +1,107 @@ +package stonenode + +import ( + "context" + "testing" + + "google.golang.org/grpc" + + ptypes "github.com/bnb-chain/inscription-storage-provider/pkg/types/v1" + service "github.com/bnb-chain/inscription-storage-provider/service/types/v1" +) + +func setup(t *testing.T) *StoneNodeService { + return &StoneNodeService{ + cfg: &StoneNodeConfig{ + Address: "test1", + StoneHubServiceAddress: "test2", + SyncerServiceAddress: "test3", + StorageProvider: "test", + StoneJobLimit: 0, + }, + name: ServiceNameStoneNode, + stoneLimit: 0, + } +} + +func mockAllocResp(objectID uint64, payloadSize uint64, redundancyType ptypes.RedundancyType) *service.StoneHubServiceAllocStoneJobResponse { + return &service.StoneHubServiceAllocStoneJobResponse{ + TraceId: "123456", + TxHash: []byte("blockchain_one"), + PieceJob: &service.PieceJob{ + BucketName: "bucket1", + ObjectName: "object1", + TxHash: []byte("blockchain_one"), + ObjectId: objectID, + PayloadSize: payloadSize, + TargetIdx: nil, + RedundancyType: redundancyType, + }, + ErrMessage: &service.ErrMessage{ + ErrCode: 0, + ErrMsg: "Success", + }, + } +} + +func dispatchPieceMap() map[string][][]byte { + ecList1 := [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4"), []byte("5"), []byte("6")} + ecList2 := [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")} + pMap := make(map[string][][]byte) + pMap["123456_s0"] = ecList1 + pMap["123456_s1"] = ecList2 + return pMap +} + +func dispatchSegmentMap() map[string][][]byte { + segmentList1 := [][]byte{[]byte("10")} + segmentList2 := [][]byte{[]byte("20")} + segmentList3 := [][]byte{[]byte("30")} + sMap := make(map[string][][]byte) + sMap["789_s0"] = segmentList1 + sMap["789_s1"] = segmentList2 + sMap["789_s2"] = segmentList3 + return sMap +} + +func dispatchInlineMap() map[string][][]byte { + inlineList := [][]byte{[]byte("+")} + iMap := make(map[string][][]byte) + iMap["543_s0"] = inlineList + return iMap +} + +func makeStreamMock() *StreamMock { + return &StreamMock{ + ctx: context.Background(), + recvToServer: make(chan *service.SyncerServiceUploadECPieceRequest, 10), + } +} + +type StreamMock struct { + grpc.ClientStream + ctx context.Context + recvToServer chan *service.SyncerServiceUploadECPieceRequest +} + +func (m *StreamMock) Send(resp *service.SyncerServiceUploadECPieceRequest) error { + m.recvToServer <- resp + return nil +} + +func (m *StreamMock) CloseAndRecv() (*service.SyncerServiceUploadECPieceResponse, error) { + return &service.SyncerServiceUploadECPieceResponse{ + TraceId: "test_traceID", + SecondarySpInfo: &service.StorageProviderSealInfo{ + StorageProviderId: "sp1", + PieceIdx: 1, + PieceChecksum: [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4"), []byte("5"), []byte("6")}, + IntegrityHash: []byte("a"), + Signature: nil, + }, + ErrMessage: &service.ErrMessage{ + ErrCode: service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED, + ErrMsg: "Success", + }, + }, nil +} diff --git a/service/stonenode/server.go b/service/stonenode/server.go index 7a28c0809..106b302a2 100644 --- a/service/stonenode/server.go +++ b/service/stonenode/server.go @@ -2,7 +2,6 @@ package stonenode import ( "context" - "errors" "fmt" "sync/atomic" "time" @@ -19,11 +18,14 @@ const ( // StoneNodeService manages stone execution units type StoneNodeService struct { - cfg *StoneNodeConfig - name string - syncer *client.SyncerClient - stoneHub *client.StoneHubClient - store *client.StoreClient + cfg *StoneNodeConfig + name string + //syncer *client.SyncerClient + //stoneHub *client.StoneHubClient + //store *client.StoreClient + syncer client.SyncerAPI + stoneHub client.StoneHubAPI + store client.PieceStoreAPI stoneLimit int64 running atomic.Bool @@ -47,7 +49,7 @@ func NewStoneNodeService(config *StoneNodeConfig) (*StoneNodeService, error) { // InitClient inits store client and rpc client func (node *StoneNodeService) InitClient() error { if node.running.Load() == true { - return errors.New("stone node resource is running") + return merrors.ErrStoneNodeStarted } store, err := client.NewStoreClient(node.cfg.PieceConfig) if err != nil { diff --git a/service/stonenode/sync.go b/service/stonenode/sync.go index 5eb50f9a8..fe7b25472 100644 --- a/service/stonenode/sync.go +++ b/service/stonenode/sync.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "errors" + "fmt" + "sort" "sync" "sync/atomic" @@ -47,6 +49,7 @@ func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, allocR } // loadSegmentsData load segment data from primary storage provider. +// returned map key is segmentKey, value is corresponding ec data from ec1 to ec6, or segment data func (node *StoneNodeService) loadSegmentsData(ctx context.Context, allocResp *service.StoneHubServiceAllocStoneJobResponse) ( map[string][][]byte, error) { type segment struct { @@ -155,60 +158,133 @@ func (node *StoneNodeService) generatePieceData(redundancyType ptypes.Redundancy } // dispatchSecondarySP dispatch piece data to secondary storage provider. +// returned map key is spID, value map key is piece key or segment key, value map's value is piece data func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment map[string][][]byte, redundancyType ptypes.RedundancyType, secondarySPs []string, targetIdx []uint32) (map[string]map[string][]byte, error) { - var pieceDataBySecondary map[string]map[string][]byte + pieceDataBySecondary := make(map[string]map[string][]byte) // pieceDataBySegment key is segment key, value is ec data from ec1 to ec6 - for pieceKey, pieceData := range pieceDataBySegment { + var err error + switch redundancyType { + case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: + pieceDataBySecondary, err = fillECData(pieceDataBySegment, secondarySPs, targetIdx) + if err != nil { + return map[string]map[string][]byte{}, err + } + return pieceDataBySecondary, nil + case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + pieceDataBySecondary, err = fillReplicaOrInlineData(pieceDataBySegment, secondarySPs, targetIdx) + if err != nil { + return map[string]map[string][]byte{}, err + } + return pieceDataBySecondary, nil + default: + return map[string]map[string][]byte{}, merrors.ErrRedundancyType + } +} + +func fillECData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) ( + map[string]map[string][]byte, error) { + ecPieceDataMap := make(map[string]map[string][]byte) + + // iterate map in order + keys := sortedKeys(pieceDataBySegment) + for _, pieceKey := range keys { + pieceData := pieceDataBySegment[pieceKey] for idx, data := range pieceData { if idx >= len(secondarySPs) { - return pieceDataBySecondary, merrors.ErrSecondarySPNumber + return map[string]map[string][]byte{}, merrors.ErrSecondarySPNumber } + // initialize data map sp := secondarySPs[idx] + if len(targetIdx) == 0 { + if _, ok := ecPieceDataMap[sp]; !ok { + ecPieceDataMap[sp] = make(map[string][]byte) + } + } else { + for _, j := range targetIdx { + if int(j-1) == idx { + if _, ok := ecPieceDataMap[sp]; !ok { + ecPieceDataMap[sp] = make(map[string][]byte) + } + } + } + } - var err error + var key string // if targetIdx is not equal to zero, retry to get data which idx is equal to targetIdx if len(targetIdx) != 0 { for _, j := range targetIdx { - if int(j) == idx { - pieceDataBySecondary, err = fillECData(sp, pieceKey, data, redundancyType, idx) - if err != nil { - return nil, err - } + if int(j-1) == idx { + key = piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, idx) + ecPieceDataMap[sp][key] = data } } } else { - pieceDataBySecondary, err = fillECData(sp, pieceKey, data, redundancyType, idx) - if err != nil { - return nil, err - } + key = piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, idx) + ecPieceDataMap[sp][key] = data } } } - return pieceDataBySecondary, nil + fmt.Println(ecPieceDataMap) + return ecPieceDataMap, nil } -// fillECData -func fillECData(spID string, pieceKey string, data []byte, redundancyType ptypes.RedundancyType, index int) ( +func fillReplicaOrInlineData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) ( map[string]map[string][]byte, error) { - var pieceDataBySecondary map[string]map[string][]byte - switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: - if _, ok := pieceDataBySecondary[spID]; !ok { - pieceDataBySecondary[spID] = make(map[string][]byte) + replicaOrInlineDataMap := make(map[string]map[string][]byte) + if len(pieceDataBySegment) >= len(secondarySPs) { + return map[string]map[string][]byte{}, merrors.ErrSecondarySPNumber + } + + // iterate map in order + keys := sortedKeys(pieceDataBySegment) + for i := 0; i < len(keys); i++ { + pieceKey := keys[i] + pieceData := pieceDataBySegment[pieceKey] + if len(pieceData) != 1 { + return nil, merrors.ErrInvalidSegmentData } - key := piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, index) - pieceDataBySecondary[spID][key] = data - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - if _, ok := pieceDataBySecondary[spID]; !ok { - pieceDataBySecondary[spID] = make(map[string][]byte) + + sp := secondarySPs[i] + if len(targetIdx) == 0 { + if _, ok := replicaOrInlineDataMap[sp]; !ok { + replicaOrInlineDataMap[sp] = make(map[string][]byte) + } + } else { + for _, index := range targetIdx { + if int(index) == i { + if _, ok := replicaOrInlineDataMap[sp]; !ok { + replicaOrInlineDataMap[sp] = make(map[string][]byte) + } + } + } } - pieceDataBySecondary[spID][pieceKey] = data - default: - return nil, merrors.ErrRedundancyType + + var key string + if len(targetIdx) != 0 { + for _, index := range targetIdx { + if int(index) == i { + key = pieceKey + replicaOrInlineDataMap[sp][key] = pieceData[0] + } + } + } else { + key = pieceKey + replicaOrInlineDataMap[sp][key] = pieceData[0] + } + } + fmt.Println(replicaOrInlineDataMap) + return replicaOrInlineDataMap, nil +} + +func sortedKeys(dataMap map[string][][]byte) []string { + keys := make([]string, 0, len(dataMap)) + for k := range dataMap { + keys = append(keys, k) } - return pieceDataBySecondary, nil + sort.Strings(keys) + return keys } // doSyncToSecondarySP send piece data to the secondary. @@ -270,7 +346,7 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *ser integrityHash := hash.GenerateIntegrityHash(pieceHash, secondary) if syncResp.GetSecondarySpInfo() == nil || syncResp.GetSecondarySpInfo().GetIntegrityHash() == nil || - bytes.Equal(integrityHash, syncResp.GetSecondarySpInfo().GetIntegrityHash()) { + !bytes.Equal(integrityHash, syncResp.GetSecondarySpInfo().GetIntegrityHash()) { log.CtxErrorw(ctx, "secondary integrity hash check error") errMsg.ErrCode = service.ErrCode_ERR_CODE_ERROR errMsg.ErrMsg = merrors.ErrIntegrityHash.Error() diff --git a/service/stonenode/sync_test.go b/service/stonenode/sync_test.go new file mode 100644 index 000000000..c3989c7cd --- /dev/null +++ b/service/stonenode/sync_test.go @@ -0,0 +1,374 @@ +package stonenode + +import ( + "context" + "errors" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + + merrors "github.com/bnb-chain/inscription-storage-provider/model/errors" + ptypes "github.com/bnb-chain/inscription-storage-provider/pkg/types/v1" + "github.com/bnb-chain/inscription-storage-provider/service/client/mock" + service "github.com/bnb-chain/inscription-storage-provider/service/types/v1" +) + +func TestInitClientFailed(t *testing.T) { + node := &StoneNodeService{ + name: ServiceNameStoneNode, + stoneLimit: 0, + } + node.running.Store(true) + err := node.InitClient() + assert.Equal(t, merrors.ErrStoneNodeStarted, err) +} + +func Test_loadSegmentsDataSuccess(t *testing.T) { + cases := []struct { + name string + req1 uint64 + req2 uint64 + req3 ptypes.RedundancyType + wantedResult1 string + wantedResult2 int + wantedErr error + }{ + { + name: "ec type: payload size greater than 16MB", + req1: 20230109001, + req2: 20 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + wantedResult1: "20230109001", + wantedResult2: 2, + wantedErr: nil, + }, + { + name: "ec type: payload size less than 16MB and greater than 1MB", + req1: 20230109002, + req2: 15 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + wantedResult1: "20230109002", + wantedResult2: 1, + wantedErr: nil, + }, + { + name: "replica type: payload size greater than 16MB", + req1: 20230109003, + req2: 20 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + wantedResult1: "20230109003", + wantedResult2: 2, + wantedErr: nil, + }, + { + name: "replica type: payload size less than 16MB and greater than 1MB", + req1: 20230109004, + req2: 15 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + wantedResult1: "20230109004", + wantedResult2: 1, + wantedErr: nil, + }, + { + name: "inline type: payload size less than 1MB", + req1: 20230109005, + req2: 1000 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, + wantedResult1: "20230109005", + wantedResult2: 1, + wantedErr: nil, + }, + } + + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ps := mock.NewMockPieceStoreAPI(ctrl) + node.store = ps + ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + return []byte("1"), nil + }).AnyTimes() + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + allocResp := mockAllocResp(tt.req1, tt.req2, tt.req3) + result, err := node.loadSegmentsData(context.TODO(), allocResp) + assert.Equal(t, nil, err) + for k, _ := range result { + assert.Contains(t, k, tt.wantedResult1) + } + assert.Equal(t, tt.wantedResult2, len(result)) + }) + } +} + +func Test_loadSegmentsDataPieceStoreError(t *testing.T) { + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ps := mock.NewMockPieceStoreAPI(ctrl) + node.store = ps + ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + return nil, errors.New("piece store s3 network error") + }).AnyTimes() + + result, err := node.loadSegmentsData(context.TODO(), mockAllocResp(20230109001, 20*1024*1024, + ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)) + assert.Equal(t, errors.New("piece store s3 network error"), err) + assert.Equal(t, 0, len(result)) +} + +func Test_loadSegmentsDataUnknownRedundancyError(t *testing.T) { + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ps := mock.NewMockPieceStoreAPI(ctrl) + node.store = ps + ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + return []byte("1"), nil + }).AnyTimes() + + result, err := node.loadSegmentsData(context.TODO(), mockAllocResp(20230109006, 20*1024*1024, + ptypes.RedundancyType(-1))) + assert.Equal(t, merrors.ErrRedundancyType, err) + assert.Equal(t, 0, len(result)) +} + +func Test_generatePieceData(t *testing.T) { + cases := []struct { + name string + req1 ptypes.RedundancyType + req2 []byte + wantedResult int + wantedErr error + }{ + { + name: "ec type", + req1: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req2: []byte("1"), + wantedResult: 6, + wantedErr: nil, + }, + { + name: "replica type", + req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req2: []byte("1"), + wantedResult: 1, + wantedErr: nil, + }, + { + name: "inline type", + req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req2: []byte("1"), + wantedResult: 1, + wantedErr: nil, + }, + { + name: "unknown redundancy type", + req1: ptypes.RedundancyType(-1), + req2: []byte("1"), + wantedResult: 0, + wantedErr: merrors.ErrRedundancyType, + }, + } + + node := setup(t) + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + result, err := node.generatePieceData(tt.req1, tt.req2) + assert.Equal(t, err, tt.wantedErr) + assert.Equal(t, len(result), tt.wantedResult) + }) + } +} + +func Test_dispatchSecondarySP(t *testing.T) { + spList := []string{"sp1", "sp2", "sp3", "sp4", "sp5", "sp6"} + cases := []struct { + name string + req1 map[string][][]byte + req2 ptypes.RedundancyType + req3 []string + req4 []uint32 + wantedResult1 int + wantedErr error + }{ + { + name: "ec type dispatch", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: spList, + req4: []uint32{}, + wantedResult1: 6, + wantedErr: nil, + }, + { + name: "replica type dispatch", + req1: dispatchSegmentMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{}, + wantedResult1: 3, + wantedErr: nil, + }, + { + name: "inline type dispatch", + req1: dispatchInlineMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, + req3: spList, + req4: []uint32{}, + wantedResult1: 1, + wantedErr: nil, + }, + { + name: "ec type data retransmission", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: spList, + req4: []uint32{2, 3}, + wantedResult1: 2, + wantedErr: nil, + }, + { + name: "replica type data retransmission", + req1: dispatchSegmentMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{1, 2}, + wantedResult1: 2, + wantedErr: nil, + }, + { + name: "unknown redundancy type", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType(-1), + req3: spList, + req4: []uint32{}, + wantedResult1: 0, + wantedErr: merrors.ErrRedundancyType, + }, + { + name: "wrong secondary sp number", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: []string{}, + req4: []uint32{}, + wantedResult1: 0, + wantedErr: merrors.ErrSecondarySPNumber, + }, + { + name: "wrong replica/inline segment data length", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{}, + wantedResult1: 0, + wantedErr: merrors.ErrInvalidSegmentData, + }, + } + + node := setup(t) + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + result, err := node.dispatchSecondarySP(tt.req1, tt.req2, tt.req3, tt.req4) + assert.Equal(t, err, tt.wantedErr) + assert.Equal(t, len(result), tt.wantedResult1) + }) + } +} + +// TODO:need improved +func Test_doSyncToSecondarySP(t *testing.T) { + data := map[string]map[string][]byte{ + "sp1": { + "123456_s0_p0": []byte("test1"), + "123456_s1_p0": []byte("test2"), + "123456_s2_p0": []byte("test3"), + "123456_s3_p0": []byte("test4"), + "123456_s4_p0": []byte("test5"), + "123456_s5_p0": []byte("test6"), + }, + } + cases := []struct { + name string + req1 *service.StoneHubServiceAllocStoneJobResponse + req2 map[string]map[string][]byte + }{ + { + name: "1", + req1: nil, + req2: data, + }, + } + + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // stoneHub service stub + stoneHub := mock.NewMockStoneHubAPI(ctrl) + node.stoneHub = stoneHub + stoneHub.EXPECT().DoneSecondaryPieceJob(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) ( + *service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { + return nil, nil + }) + + // syncer service stub + streamClient := makeStreamMock() + syncer := mock.NewMockSyncerAPI(ctrl) + node.syncer = syncer + syncer.EXPECT().UploadECPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_UploadECPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + allocResp := mockAllocResp(123456, 20*1024*1024, ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) + err := node.doSyncToSecondarySP(context.TODO(), allocResp, tt.req2) + assert.Equal(t, nil, err) + }) + } +} + +func TestUploadECPieceSuccess(t *testing.T) { + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + streamClient := makeStreamMock() + syncer := mock.NewMockSyncerAPI(ctrl) + node.syncer = syncer + syncer.EXPECT().UploadECPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_UploadECPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + sInfo := &service.SyncerInfo{ + ObjectId: 123456, + TxHash: []byte("i"), + StorageProviderId: "sp1", + RedundancyType: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + } + data := map[string][]byte{ + "123456_s0_p0": []byte("test1"), + "123456_s1_p0": []byte("test2"), + "123456_s2_p0": []byte("test3"), + "123456_s3_p0": []byte("test4"), + "123456_s4_p0": []byte("test5"), + "123456_s5_p0": []byte("test6"), + } + resp, err := node.UploadECPiece(context.TODO(), 3, sInfo, data, "test_traceID") + assert.Equal(t, err, nil) + assert.Equal(t, resp.GetTraceId(), "test_traceID") + assert.Equal(t, resp.GetSecondarySpInfo().GetPieceIdx(), uint32(1)) +} diff --git a/service/syncer/server.go b/service/syncer/server.go index c39cf7e52..d65973fd2 100644 --- a/service/syncer/server.go +++ b/service/syncer/server.go @@ -21,7 +21,7 @@ const ( type Syncer struct { cfg *SyncerConfig name string - store *client.StoreClient + store client.PieceStoreAPI } // NewSyncerService creates a syncer service to upload piece to piece store diff --git a/service/syncer/syncer_service.go b/service/syncer/syncer_service.go index fbcf67fb4..6e9c67e09 100644 --- a/service/syncer/syncer_service.go +++ b/service/syncer/syncer_service.go @@ -10,11 +10,6 @@ import ( "github.com/bnb-chain/inscription-storage-provider/util/log" ) -// UploadECPieceAPI used to mock -type UploadECPieceAPI interface { - UploadECPiece(stream service.SyncerService_UploadECPieceServer) error -} - // UploadECPiece uploads piece data encoded using the ec algorithm to secondary storage provider func (s *Syncer) UploadECPiece(stream service.SyncerService_UploadECPieceServer) (err error) { var req *service.SyncerServiceUploadECPieceRequest