From fbf1e59e3b7cf69d2dec0dc70cee9cfcce0c549f Mon Sep 17 00:00:00 2001 From: DylanYong Date: Tue, 10 Jan 2023 15:50:26 +0800 Subject: [PATCH 1/3] add uint test for stone node and syncer module --- service/client/mock/piece_store_mock.go | 64 +++++ service/client/mock/stone_hub_mock.go | 171 +++++++++++++ service/client/mock/syncer_mock.go | 71 +++++ service/client/piece_store_client.go | 9 + service/client/stone_hub_client.go | 36 ++- service/client/syncer_client.go | 14 +- service/stonenode/helper_test.go | 69 +++++ service/stonenode/map.json | 50 ++++ service/stonenode/server.go | 16 +- service/stonenode/sync.go | 104 ++++++-- service/stonenode/sync_test.go | 327 ++++++++++++++++++++++++ service/syncer/server.go | 2 +- service/syncer/syncer_service.go | 5 - 13 files changed, 897 insertions(+), 41 deletions(-) create mode 100644 service/client/mock/piece_store_mock.go create mode 100644 service/client/mock/stone_hub_mock.go create mode 100644 service/client/mock/syncer_mock.go create mode 100644 service/stonenode/helper_test.go create mode 100644 service/stonenode/map.json create mode 100644 service/stonenode/sync_test.go diff --git a/service/client/mock/piece_store_mock.go b/service/client/mock/piece_store_mock.go new file mode 100644 index 000000000..5394ca66c --- /dev/null +++ b/service/client/mock/piece_store_mock.go @@ -0,0 +1,64 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./piece_store_client.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockPieceStoreAPI is a mock of PieceStoreAPI interface. +type MockPieceStoreAPI struct { + ctrl *gomock.Controller + recorder *MockPieceStoreAPIMockRecorder +} + +// MockPieceStoreAPIMockRecorder is the mock recorder for MockPieceStoreAPI. +type MockPieceStoreAPIMockRecorder struct { + mock *MockPieceStoreAPI +} + +// NewMockPieceStoreAPI creates a new mock instance. +func NewMockPieceStoreAPI(ctrl *gomock.Controller) *MockPieceStoreAPI { + mock := &MockPieceStoreAPI{ctrl: ctrl} + mock.recorder = &MockPieceStoreAPIMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPieceStoreAPI) EXPECT() *MockPieceStoreAPIMockRecorder { + return m.recorder +} + +// GetPiece mocks base method. +func (m *MockPieceStoreAPI) GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPiece", ctx, key, offset, limit) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPiece indicates an expected call of GetPiece. +func (mr *MockPieceStoreAPIMockRecorder) GetPiece(ctx, key, offset, limit interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPiece", reflect.TypeOf((*MockPieceStoreAPI)(nil).GetPiece), ctx, key, offset, limit) +} + +// PutPiece mocks base method. +func (m *MockPieceStoreAPI) PutPiece(key string, value []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutPiece", key, value) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutPiece indicates an expected call of PutPiece. +func (mr *MockPieceStoreAPIMockRecorder) PutPiece(key, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutPiece", reflect.TypeOf((*MockPieceStoreAPI)(nil).PutPiece), key, value) +} diff --git a/service/client/mock/stone_hub_mock.go b/service/client/mock/stone_hub_mock.go new file mode 100644 index 000000000..0898fc801 --- /dev/null +++ b/service/client/mock/stone_hub_mock.go @@ -0,0 +1,171 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./stone_hub_client.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + v1 "github.com/bnb-chain/inscription-storage-provider/service/types/v1" + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockStoneHubAPI is a mock of StoneHubAPI interface. +type MockStoneHubAPI struct { + ctrl *gomock.Controller + recorder *MockStoneHubAPIMockRecorder +} + +// MockStoneHubAPIMockRecorder is the mock recorder for MockStoneHubAPI. +type MockStoneHubAPIMockRecorder struct { + mock *MockStoneHubAPI +} + +// NewMockStoneHubAPI creates a new mock instance. +func NewMockStoneHubAPI(ctrl *gomock.Controller) *MockStoneHubAPI { + mock := &MockStoneHubAPI{ctrl: ctrl} + mock.recorder = &MockStoneHubAPIMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStoneHubAPI) EXPECT() *MockStoneHubAPIMockRecorder { + return m.recorder +} + +// AllocStoneJob mocks base method. +func (m *MockStoneHubAPI) AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) (*v1.StoneHubServiceAllocStoneJobResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "AllocStoneJob", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceAllocStoneJobResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AllocStoneJob indicates an expected call of AllocStoneJob. +func (mr *MockStoneHubAPIMockRecorder) AllocStoneJob(ctx interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllocStoneJob", reflect.TypeOf((*MockStoneHubAPI)(nil).AllocStoneJob), varargs...) +} + +// BeginUploadPayload mocks base method. +func (m *MockStoneHubAPI) BeginUploadPayload(ctx context.Context, in *v1.StoneHubServiceBeginUploadPayloadRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceBeginUploadPayloadResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "BeginUploadPayload", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceBeginUploadPayloadResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BeginUploadPayload indicates an expected call of BeginUploadPayload. +func (mr *MockStoneHubAPIMockRecorder) BeginUploadPayload(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BeginUploadPayload", reflect.TypeOf((*MockStoneHubAPI)(nil).BeginUploadPayload), varargs...) +} + +// Close mocks base method. +func (m *MockStoneHubAPI) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockStoneHubAPIMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStoneHubAPI)(nil).Close)) +} + +// CreateObject mocks base method. +func (m *MockStoneHubAPI) CreateObject(ctx context.Context, in *v1.StoneHubServiceCreateObjectRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceCreateObjectResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CreateObject", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceCreateObjectResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateObject indicates an expected call of CreateObject. +func (mr *MockStoneHubAPIMockRecorder) CreateObject(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateObject", reflect.TypeOf((*MockStoneHubAPI)(nil).CreateObject), varargs...) +} + +// DonePrimaryPieceJob mocks base method. +func (m *MockStoneHubAPI) DonePrimaryPieceJob(ctx context.Context, in *v1.StoneHubServiceDonePrimaryPieceJobRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceDonePrimaryPieceJobResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DonePrimaryPieceJob", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceDonePrimaryPieceJobResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DonePrimaryPieceJob indicates an expected call of DonePrimaryPieceJob. +func (mr *MockStoneHubAPIMockRecorder) DonePrimaryPieceJob(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DonePrimaryPieceJob", reflect.TypeOf((*MockStoneHubAPI)(nil).DonePrimaryPieceJob), varargs...) +} + +// DoneSecondaryPieceJob mocks base method. +func (m *MockStoneHubAPI) DoneSecondaryPieceJob(ctx context.Context, in *v1.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceDoneSecondaryPieceJobResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DoneSecondaryPieceJob", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceDoneSecondaryPieceJobResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DoneSecondaryPieceJob indicates an expected call of DoneSecondaryPieceJob. +func (mr *MockStoneHubAPIMockRecorder) DoneSecondaryPieceJob(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoneSecondaryPieceJob", reflect.TypeOf((*MockStoneHubAPI)(nil).DoneSecondaryPieceJob), varargs...) +} + +// SetObjectCreateInfo mocks base method. +func (m *MockStoneHubAPI) SetObjectCreateInfo(ctx context.Context, in *v1.StoneHubServiceSetObjectCreateInfoRequest, opts ...grpc.CallOption) (*v1.StoneHubServiceSetSetObjectCreateInfoResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SetObjectCreateInfo", varargs...) + ret0, _ := ret[0].(*v1.StoneHubServiceSetSetObjectCreateInfoResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SetObjectCreateInfo indicates an expected call of SetObjectCreateInfo. +func (mr *MockStoneHubAPIMockRecorder) SetObjectCreateInfo(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetObjectCreateInfo", reflect.TypeOf((*MockStoneHubAPI)(nil).SetObjectCreateInfo), varargs...) +} diff --git a/service/client/mock/syncer_mock.go b/service/client/mock/syncer_mock.go new file mode 100644 index 000000000..1385708d5 --- /dev/null +++ b/service/client/mock/syncer_mock.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./syncer_client.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + v1 "github.com/bnb-chain/inscription-storage-provider/service/types/v1" + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockSyncerAPI is a mock of SyncerAPI interface. +type MockSyncerAPI struct { + ctrl *gomock.Controller + recorder *MockSyncerAPIMockRecorder +} + +// MockSyncerAPIMockRecorder is the mock recorder for MockSyncerAPI. +type MockSyncerAPIMockRecorder struct { + mock *MockSyncerAPI +} + +// NewMockSyncerAPI creates a new mock instance. +func NewMockSyncerAPI(ctrl *gomock.Controller) *MockSyncerAPI { + mock := &MockSyncerAPI{ctrl: ctrl} + mock.recorder = &MockSyncerAPIMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSyncerAPI) EXPECT() *MockSyncerAPIMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockSyncerAPI) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockSyncerAPIMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSyncerAPI)(nil).Close)) +} + +// UploadECPiece mocks base method. +func (m *MockSyncerAPI) UploadECPiece(ctx context.Context, opts ...grpc.CallOption) (v1.SyncerService_UploadECPieceClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UploadECPiece", varargs...) + ret0, _ := ret[0].(v1.SyncerService_UploadECPieceClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UploadECPiece indicates an expected call of UploadECPiece. +func (mr *MockSyncerAPIMockRecorder) UploadECPiece(ctx interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UploadECPiece", reflect.TypeOf((*MockSyncerAPI)(nil).UploadECPiece), varargs...) +} diff --git a/service/client/piece_store_client.go b/service/client/piece_store_client.go index b13a5591b..13b7f1503 100644 --- a/service/client/piece_store_client.go +++ b/service/client/piece_store_client.go @@ -10,6 +10,15 @@ import ( "github.com/bnb-chain/inscription-storage-provider/util/log" ) +// PieceStoreAPI provides an interface to enable mocking the +// StoreClient's API operation. This makes unit test to test your code easier. +// +//go:generate mockgen -source=./piece_store_client.go -destination=./mock/piece_store_mock.go -package=mock +type PieceStoreAPI interface { + GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) + PutPiece(key string, value []byte) error +} + type StoreClient struct { ps *piece.PieceStore } diff --git a/service/client/stone_hub_client.go b/service/client/stone_hub_client.go index 4168362cc..92cab49fa 100644 --- a/service/client/stone_hub_client.go +++ b/service/client/stone_hub_client.go @@ -15,10 +15,24 @@ import ( "github.com/bnb-chain/inscription-storage-provider/util/log" ) -var ClientRpcTimeout = time.Second * 5 +var ClientRPCTimeout = time.Second * 5 var _ io.Closer = &StoneHubClient{} +// StoneHubAPI provides an interface to enable mocking the +// StoneHubClient's API operation. This makes unit test to test your code easier. +// +//go:generate mockgen -source=./stone_hub_client.go -destination=./mock/stone_hub_mock.go -package=mock +type StoneHubAPI interface { + CreateObject(ctx context.Context, in *service.StoneHubServiceCreateObjectRequest, opts ...grpc.CallOption) (*service.StoneHubServiceCreateObjectResponse, error) + SetObjectCreateInfo(ctx context.Context, in *service.StoneHubServiceSetObjectCreateInfoRequest, opts ...grpc.CallOption) (*service.StoneHubServiceSetSetObjectCreateInfoResponse, error) + BeginUploadPayload(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadRequest, opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadResponse, error) + DonePrimaryPieceJob(ctx context.Context, in *service.StoneHubServiceDonePrimaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDonePrimaryPieceJobResponse, error) + AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) (*service.StoneHubServiceAllocStoneJobResponse, error) + DoneSecondaryPieceJob(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDoneSecondaryPieceJobResponse, error) + Close() error +} + type StoneHubClient struct { address string stoneHub service.StoneHubServiceClient @@ -26,7 +40,7 @@ type StoneHubClient struct { } func NewStoneHubClient(address string) (*StoneHubClient, error) { - ctx, _ := context.WithTimeout(context.Background(), ClientRpcTimeout) + ctx, _ := context.WithTimeout(context.Background(), ClientRPCTimeout) conn, err := grpc.DialContext(ctx, address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Errorw("invoke stoneHub service grpc.DialContext failed", "error", err) @@ -40,7 +54,8 @@ func NewStoneHubClient(address string) (*StoneHubClient, error) { return client, nil } -func (client *StoneHubClient) CreateObject(ctx context.Context, in *service.StoneHubServiceCreateObjectRequest, opts ...grpc.CallOption) (*service.StoneHubServiceCreateObjectResponse, error) { +func (client *StoneHubClient) CreateObject(ctx context.Context, in *service.StoneHubServiceCreateObjectRequest, + opts ...grpc.CallOption) (*service.StoneHubServiceCreateObjectResponse, error) { resp, err := client.stoneHub.CreateObject(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { @@ -54,7 +69,8 @@ func (client *StoneHubClient) CreateObject(ctx context.Context, in *service.Ston return resp, nil } -func (client *StoneHubClient) SetObjectCreateInfo(ctx context.Context, in *service.StoneHubServiceSetObjectCreateInfoRequest, opts ...grpc.CallOption) (*service.StoneHubServiceSetSetObjectCreateInfoResponse, error) { +func (client *StoneHubClient) SetObjectCreateInfo(ctx context.Context, in *service.StoneHubServiceSetObjectCreateInfoRequest, + opts ...grpc.CallOption) (*service.StoneHubServiceSetSetObjectCreateInfoResponse, error) { resp, err := client.stoneHub.SetObjectCreateInfo(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { @@ -68,7 +84,8 @@ func (client *StoneHubClient) SetObjectCreateInfo(ctx context.Context, in *servi return resp, nil } -func (client *StoneHubClient) BeginUploadPayload(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadRequest, opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadResponse, error) { +func (client *StoneHubClient) BeginUploadPayload(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadRequest, + opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadResponse, error) { resp, err := client.stoneHub.BeginUploadPayload(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { @@ -82,7 +99,8 @@ func (client *StoneHubClient) BeginUploadPayload(ctx context.Context, in *servic return resp, nil } -func (client *StoneHubClient) DonePrimaryPieceJob(ctx context.Context, in *service.StoneHubServiceDonePrimaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDonePrimaryPieceJobResponse, error) { +func (client *StoneHubClient) DonePrimaryPieceJob(ctx context.Context, in *service.StoneHubServiceDonePrimaryPieceJobRequest, + opts ...grpc.CallOption) (*service.StoneHubServiceDonePrimaryPieceJobResponse, error) { resp, err := client.stoneHub.DonePrimaryPieceJob(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { @@ -96,7 +114,8 @@ func (client *StoneHubClient) DonePrimaryPieceJob(ctx context.Context, in *servi return resp, nil } -func (client *StoneHubClient) AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) (*service.StoneHubServiceAllocStoneJobResponse, error) { +func (client *StoneHubClient) AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) ( + *service.StoneHubServiceAllocStoneJobResponse, error) { req := &service.StoneHubServiceAllocStoneJobRequest{TraceId: util.GenerateRequestID()} resp, err := client.stoneHub.AllocStoneJob(ctx, req, opts...) ctx = log.Context(ctx, resp) @@ -115,7 +134,8 @@ func (client *StoneHubClient) AllocStoneJob(ctx context.Context, opts ...grpc.Ca return resp, nil } -func (client *StoneHubClient) DoneSecondaryPieceJob(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { +func (client *StoneHubClient) DoneSecondaryPieceJob(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, + opts ...grpc.CallOption) (*service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { resp, err := client.stoneHub.DoneSecondaryPieceJob(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { diff --git a/service/client/syncer_client.go b/service/client/syncer_client.go index 42499b148..d70cf1e60 100644 --- a/service/client/syncer_client.go +++ b/service/client/syncer_client.go @@ -14,6 +14,15 @@ import ( var _ io.Closer = &SyncerClient{} +// SyncerAPI provides an interface to enable mocking the +// SyncerClient's API operation. This makes unit test to test your code easier. +// +//go:generate mockgen -source=./syncer_client.go -destination=./mock/syncer_mock.go -package=mock +type SyncerAPI interface { + UploadECPiece(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_UploadECPieceClient, error) + Close() error +} + type SyncerClient struct { address string syncer service.SyncerServiceClient @@ -21,7 +30,7 @@ type SyncerClient struct { } func NewSyncerClient(address string) (*SyncerClient, error) { - ctx, _ := context.WithTimeout(context.Background(), ClientRpcTimeout) + ctx, _ := context.WithTimeout(context.Background(), ClientRPCTimeout) conn, err := grpc.DialContext(ctx, address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Errorw("invoke stoneHub service grpc.DialContext failed", "error", err) @@ -36,7 +45,8 @@ func NewSyncerClient(address string) (*SyncerClient, error) { } // UploadECPiece return SyncerService_UploadECPieceClient, need to be closed by caller -func (client *SyncerClient) UploadECPiece(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_UploadECPieceClient, error) { +func (client *SyncerClient) UploadECPiece(ctx context.Context, opts ...grpc.CallOption) ( + service.SyncerService_UploadECPieceClient, error) { return client.syncer.UploadECPiece(ctx, opts...) } diff --git a/service/stonenode/helper_test.go b/service/stonenode/helper_test.go new file mode 100644 index 000000000..0fecfe8e6 --- /dev/null +++ b/service/stonenode/helper_test.go @@ -0,0 +1,69 @@ +package stonenode + +import ( + "testing" + + ptypes "github.com/bnb-chain/inscription-storage-provider/pkg/types/v1" + service "github.com/bnb-chain/inscription-storage-provider/service/types/v1" +) + +func setup(t *testing.T) *StoneNodeService { + return &StoneNodeService{ + cfg: &StoneNodeConfig{ + Address: "test1", + StoneHubServiceAddress: "test2", + SyncerServiceAddress: "test3", + StorageProvider: "test", + StoneJobLimit: 0, + }, + name: ServiceNameStoneNode, + stoneLimit: 0, + } +} + +func mockAllocResp(objectID uint64, payloadSize uint64, redundancyType ptypes.RedundancyType) *service.StoneHubServiceAllocStoneJobResponse { + return &service.StoneHubServiceAllocStoneJobResponse{ + TraceId: "123456", + TxHash: []byte("blockchain_one"), + PieceJob: &service.PieceJob{ + BucketName: "bucket1", + ObjectName: "object1", + TxHash: []byte("blockchain_one"), + ObjectId: objectID, + PayloadSize: payloadSize, + TargetIdx: nil, + RedundancyType: redundancyType, + }, + ErrMessage: &service.ErrMessage{ + ErrCode: 0, + ErrMsg: "Success", + }, + } +} + +func dispatchPieceMap() map[string][][]byte { + ecList1 := [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4"), []byte("5"), []byte("6")} + ecList2 := [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")} + pMap := make(map[string][][]byte) + pMap["123456_s0"] = ecList1 + pMap["123456_s1"] = ecList2 + return pMap +} + +func dispatchSegmentMap() map[string][][]byte { + segmentList1 := [][]byte{[]byte("10")} + segmentList2 := [][]byte{[]byte("20")} + segmentList3 := [][]byte{[]byte("30")} + sMap := make(map[string][][]byte) + sMap["789_s0"] = segmentList1 + sMap["789_s1"] = segmentList2 + sMap["789_s2"] = segmentList3 + return sMap +} + +func dispatchInlineMap() map[string][][]byte { + inlineList := [][]byte{[]byte("+")} + iMap := make(map[string][][]byte) + iMap["543_s0"] = inlineList + return iMap +} diff --git a/service/stonenode/map.json b/service/stonenode/map.json new file mode 100644 index 000000000..6cf524de0 --- /dev/null +++ b/service/stonenode/map.json @@ -0,0 +1,50 @@ +{ + "sp1": { + "123456_s0_p0": "test1", + "123456_s1_p0": "test2", + "123456_s2_p0": "test3", + "123456_s3_p0": "test4", + "123456_s4_p0": "test5", + "123456_s5_p0": "test6" + }, + "sp2": { + "123456_s0_p1": "test7", + "123456_s1_p1": "test8", + "123456_s2_p1": "test9", + "123456_s3_p1": "test10", + "123456_s4_p1": "test11", + "123456_s5_p1": "test12" + }, + "sp3": { + "123456_s0_p2": "test13", + "123456_s1_p2": "test14", + "123456_s2_p2": "test15", + "123456_s3_p2": "test16", + "123456_s4_p2": "test17", + "123456_s5_p2": "test18" + }, + "sp4": { + "123456_s0_p3": "test19", + "123456_s1_p3": "test20", + "123456_s2_p3": "test21", + "123456_s3_p3": "test22", + "123456_s4_p3": "test23", + "123456_s5_p3": "test24" + }, + "sp5": { + "123456_s0_p4": "test25", + "123456_s1_p4": "test26", + "123456_s2_p4": "test27", + "123456_s3_p4": "test28", + "123456_s4_p4": "test29", + "123456_s5_p4": "test30" + }, + "sp6": { + "123456_s0_p5": "test31", + "123456_s1_p5": "test32", + "123456_s2_p5": "test33", + "123456_s3_p5": "test34", + "123456_s4_p5": "test35", + "123456_s5_p5": "test36" + } +} diff --git a/service/stonenode/server.go b/service/stonenode/server.go index 7a28c0809..106b302a2 100644 --- a/service/stonenode/server.go +++ b/service/stonenode/server.go @@ -2,7 +2,6 @@ package stonenode import ( "context" - "errors" "fmt" "sync/atomic" "time" @@ -19,11 +18,14 @@ const ( // StoneNodeService manages stone execution units type StoneNodeService struct { - cfg *StoneNodeConfig - name string - syncer *client.SyncerClient - stoneHub *client.StoneHubClient - store *client.StoreClient + cfg *StoneNodeConfig + name string + //syncer *client.SyncerClient + //stoneHub *client.StoneHubClient + //store *client.StoreClient + syncer client.SyncerAPI + stoneHub client.StoneHubAPI + store client.PieceStoreAPI stoneLimit int64 running atomic.Bool @@ -47,7 +49,7 @@ func NewStoneNodeService(config *StoneNodeConfig) (*StoneNodeService, error) { // InitClient inits store client and rpc client func (node *StoneNodeService) InitClient() error { if node.running.Load() == true { - return errors.New("stone node resource is running") + return merrors.ErrStoneNodeStarted } store, err := client.NewStoreClient(node.cfg.PieceConfig) if err != nil { diff --git a/service/stonenode/sync.go b/service/stonenode/sync.go index 5eb50f9a8..59f1f1488 100644 --- a/service/stonenode/sync.go +++ b/service/stonenode/sync.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "sync" "sync/atomic" @@ -47,6 +48,7 @@ func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, allocR } // loadSegmentsData load segment data from primary storage provider. +// returned map key is segmentKey, value is corresponding ec data from ec1 to ec6, or segment data func (node *StoneNodeService) loadSegmentsData(ctx context.Context, allocResp *service.StoneHubServiceAllocStoneJobResponse) ( map[string][][]byte, error) { type segment struct { @@ -155,9 +157,19 @@ func (node *StoneNodeService) generatePieceData(redundancyType ptypes.Redundancy } // dispatchSecondarySP dispatch piece data to secondary storage provider. +// returned map key is spID, value map key is piece key or segment key, value map's value is piece data func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment map[string][][]byte, redundancyType ptypes.RedundancyType, secondarySPs []string, targetIdx []uint32) (map[string]map[string][]byte, error) { - var pieceDataBySecondary map[string]map[string][]byte + pieceDataBySecondary := make(map[string]map[string][]byte) + + switch redundancyType { + case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: + + case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + + default: + return nil, merrors.ErrRedundancyType + } // pieceDataBySegment key is segment key, value is ec data from ec1 to ec6 for pieceKey, pieceData := range pieceDataBySegment { @@ -166,49 +178,105 @@ func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment map[string] return pieceDataBySecondary, merrors.ErrSecondarySPNumber } sp := secondarySPs[idx] + if _, ok := pieceDataBySecondary[sp]; !ok { + pieceDataBySecondary[sp] = make(map[string][]byte) + } - var err error + var ( + err error + key string + ) // if targetIdx is not equal to zero, retry to get data which idx is equal to targetIdx if len(targetIdx) != 0 { for _, j := range targetIdx { if int(j) == idx { - pieceDataBySecondary, err = fillECData(sp, pieceKey, data, redundancyType, idx) + key, err = joinECKey(pieceKey, redundancyType, idx) if err != nil { return nil, err } } } } else { - pieceDataBySecondary, err = fillECData(sp, pieceKey, data, redundancyType, idx) + key, err = joinECKey(pieceKey, redundancyType, idx) if err != nil { return nil, err } } + pieceDataBySecondary[sp][key] = data } } + fmt.Println(pieceDataBySecondary) return pieceDataBySecondary, nil } -// fillECData -func fillECData(spID string, pieceKey string, data []byte, redundancyType ptypes.RedundancyType, index int) ( +func fillECData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) ( + map[string]map[string][]byte, error) { + ecPieceDataMap := make(map[string]map[string][]byte) + + for pieceKey, pieceData := range pieceDataBySegment { + for idx, data := range pieceData { + if idx >= len(secondarySPs) { + return ecPieceDataMap, merrors.ErrSecondarySPNumber + } + sp := secondarySPs[idx] + if _, ok := ecPieceDataMap[sp]; !ok { + ecPieceDataMap[sp] = make(map[string][]byte) + } + + var ( + err error + key string + ) + // if targetIdx is not equal to zero, retry to get data which idx is equal to targetIdx + if len(targetIdx) != 0 { + for _, j := range targetIdx { + if int(j) == idx { + key = piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, idx) + if err != nil { + return nil, err + } + } + } + } else { + key = piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, idx) + if err != nil { + return nil, err + } + } + ecPieceDataMap[sp][key] = data + } + } + return ecPieceDataMap, nil +} + +func fillReplicaOrInlineData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) ( map[string]map[string][]byte, error) { - var pieceDataBySecondary map[string]map[string][]byte + replicaOrInlineDataMap := make(map[string]map[string][]byte) + length := len(pieceDataBySegment) + + for pieceKey, pieceData := range pieceDataBySegment { + for idx, data := range pieceData { + if idx >= len(secondarySPs) { + return replicaOrInlineDataMap, merrors.ErrSecondarySPNumber + } + sp := secondarySPs[idx] + if _, ok := replicaOrInlineDataMap[sp]; !ok { + replicaOrInlineDataMap[sp] = make(map[string][]byte) + } + } + } +} + +// joinECKey +func joinECKey(pieceKey string, redundancyType ptypes.RedundancyType, index int) (string, error) { switch redundancyType { case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: - if _, ok := pieceDataBySecondary[spID]; !ok { - pieceDataBySecondary[spID] = make(map[string][]byte) - } - key := piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, index) - pieceDataBySecondary[spID][key] = data + return piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, index), nil case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - if _, ok := pieceDataBySecondary[spID]; !ok { - pieceDataBySecondary[spID] = make(map[string][]byte) - } - pieceDataBySecondary[spID][pieceKey] = data + return pieceKey, nil default: - return nil, merrors.ErrRedundancyType + return "", merrors.ErrRedundancyType } - return pieceDataBySecondary, nil } // doSyncToSecondarySP send piece data to the secondary. diff --git a/service/stonenode/sync_test.go b/service/stonenode/sync_test.go new file mode 100644 index 000000000..c07b06af2 --- /dev/null +++ b/service/stonenode/sync_test.go @@ -0,0 +1,327 @@ +package stonenode + +import ( + "context" + "errors" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + + merrors "github.com/bnb-chain/inscription-storage-provider/model/errors" + ptypes "github.com/bnb-chain/inscription-storage-provider/pkg/types/v1" + "github.com/bnb-chain/inscription-storage-provider/service/client/mock" + service "github.com/bnb-chain/inscription-storage-provider/service/types/v1" +) + +func TestInitClientFailed(t *testing.T) { + node := &StoneNodeService{ + name: ServiceNameStoneNode, + stoneLimit: 0, + } + node.running.Store(true) + err := node.InitClient() + assert.Equal(t, merrors.ErrStoneNodeStarted, err) +} + +func Test_loadSegmentsDataSuccess(t *testing.T) { + cases := []struct { + name string + req1 uint64 + req2 uint64 + req3 ptypes.RedundancyType + wantedResult1 string + wantedResult2 int + wantedErr error + }{ + { + name: "ec type: payload size greater than 16MB", + req1: 20230109001, + req2: 20 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + wantedResult1: "20230109001", + wantedResult2: 2, + wantedErr: nil, + }, + { + name: "ec type: payload size less than 16MB and greater than 1MB", + req1: 20230109002, + req2: 15 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + wantedResult1: "20230109002", + wantedResult2: 1, + wantedErr: nil, + }, + { + name: "replica type: payload size greater than 16MB", + req1: 20230109003, + req2: 20 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + wantedResult1: "20230109003", + wantedResult2: 2, + wantedErr: nil, + }, + { + name: "replica type: payload size less than 16MB and greater than 1MB", + req1: 20230109004, + req2: 15 * 1024 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + wantedResult1: "20230109004", + wantedResult2: 1, + wantedErr: nil, + }, + { + name: "inline type: payload size less than 1MB", + req1: 20230109005, + req2: 1000 * 1024, + req3: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, + wantedResult1: "20230109005", + wantedResult2: 1, + wantedErr: nil, + }, + } + + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ps := mock.NewMockPieceStoreAPI(ctrl) + node.store = ps + ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + return []byte("1"), nil + }).AnyTimes() + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + allocResp := mockAllocResp(tt.req1, tt.req2, tt.req3) + result, err := node.loadSegmentsData(context.TODO(), allocResp) + assert.Equal(t, nil, err) + for k, _ := range result { + assert.Contains(t, k, tt.wantedResult1) + } + assert.Equal(t, tt.wantedResult2, len(result)) + }) + } +} + +func Test_loadSegmentsDataPieceStoreError(t *testing.T) { + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ps := mock.NewMockPieceStoreAPI(ctrl) + node.store = ps + ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + return nil, errors.New("piece store s3 network error") + }).AnyTimes() + + result, err := node.loadSegmentsData(context.TODO(), mockAllocResp(20230109001, 20*1024*1024, + ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)) + assert.Equal(t, errors.New("piece store s3 network error"), err) + assert.Equal(t, 0, len(result)) +} + +func Test_loadSegmentsDataUnknownRedundancyError(t *testing.T) { + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ps := mock.NewMockPieceStoreAPI(ctrl) + node.store = ps + ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { + return []byte("1"), nil + }).AnyTimes() + + result, err := node.loadSegmentsData(context.TODO(), mockAllocResp(20230109006, 20*1024*1024, + ptypes.RedundancyType(-1))) + assert.Equal(t, merrors.ErrRedundancyType, err) + assert.Equal(t, 0, len(result)) +} + +func Test_generatePieceData(t *testing.T) { + cases := []struct { + name string + req1 ptypes.RedundancyType + req2 []byte + wantedResult int + wantedErr error + }{ + { + name: "ec type", + req1: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req2: []byte("1"), + wantedResult: 6, + wantedErr: nil, + }, + { + name: "replica type", + req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req2: []byte("1"), + wantedResult: 1, + wantedErr: nil, + }, + { + name: "inline type", + req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req2: []byte("1"), + wantedResult: 1, + wantedErr: nil, + }, + { + name: "unknown redundancy type", + req1: ptypes.RedundancyType(-1), + req2: []byte("1"), + wantedResult: 0, + wantedErr: merrors.ErrRedundancyType, + }, + } + + node := setup(t) + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + result, err := node.generatePieceData(tt.req1, tt.req2) + assert.Equal(t, err, tt.wantedErr) + assert.Equal(t, len(result), tt.wantedResult) + }) + } +} + +func Test_dispatchSecondarySP(t *testing.T) { + spList := []string{"sp1", "sp2", "sp3", "sp4", "sp5", "sp6"} + cases := []struct { + name string + req1 map[string][][]byte + req2 ptypes.RedundancyType + req3 []string + req4 []uint32 + wantedResult1 int + wantedErr error + }{ + { + name: "ec type dispatch", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: spList, + req4: []uint32{}, + wantedResult1: 6, + wantedErr: nil, + }, + { + name: "replica type dispatch", + req1: dispatchSegmentMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{}, + wantedResult1: 2, + wantedErr: nil, + }, + { + name: "inline type dispatch", + req1: dispatchInlineMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, + req3: spList, + req4: []uint32{}, + wantedResult1: 1, + wantedErr: nil, + }, + { + name: "4", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: spList, + req4: []uint32{2, 3}, + wantedResult1: 2, + wantedErr: nil, + }, + { + name: "unknown redundancy type", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType(-1), + req3: spList, + req4: []uint32{}, + wantedResult1: 0, + wantedErr: merrors.ErrRedundancyType, + }, + { + name: "wrong secondary sp number", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: []string{}, + req4: []uint32{}, + wantedResult1: 0, + wantedErr: merrors.ErrSecondarySPNumber, + }, + } + + node := setup(t) + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + result, err := node.dispatchSecondarySP(tt.req1, tt.req2, tt.req3, tt.req4) + assert.Equal(t, err, tt.wantedErr) + assert.Equal(t, len(result), tt.wantedResult1) + }) + } +} + +func TestUploadECPiece(t *testing.T) { + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + syncer := mock.NewMockSyncerAPI(ctrl) + node.syncer = syncer + syncer.EXPECT().UploadECPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_UploadECPieceClient, error) { + return nil, nil + }).AnyTimes() + +} + +func makeStreamMock() *StreamMock { + return &StreamMock{ + ctx: context.Background(), + recvToServer: make(chan *service.SyncerServiceUploadECPieceRequest, 10), + sentFromServer: make(chan *service.SyncerServiceUploadECPieceResponse, 10), + } +} + +type StreamMock struct { + grpc.ServerStream + ctx context.Context + recvToServer chan *service.SyncerServiceUploadECPieceRequest + sentFromServer chan *service.SyncerServiceUploadECPieceResponse +} + +func (m *StreamMock) Context() context.Context { + return m.ctx +} + +func (m *StreamMock) Send(resp *service.SyncerServiceUploadECPieceResponse) error { + m.sentFromServer <- resp + return nil +} + +func (m *StreamMock) Recv() (*service.SyncerServiceUploadECPieceRequest, error) { + req, more := <-m.recvToServer + if !more { + return nil, errors.New("empty") + } + return req, nil +} + +func (m *StreamMock) SendFromClient(req *service.SyncerServiceUploadECPieceRequest) error { + m.recvToServer <- req + return nil +} + +func (m *StreamMock) RecvToClient() (*service.SyncerServiceUploadECPieceResponse, error) { + response, more := <-m.sentFromServer + if !more { + return nil, errors.New("empty") + } + return response, nil +} diff --git a/service/syncer/server.go b/service/syncer/server.go index c39cf7e52..d65973fd2 100644 --- a/service/syncer/server.go +++ b/service/syncer/server.go @@ -21,7 +21,7 @@ const ( type Syncer struct { cfg *SyncerConfig name string - store *client.StoreClient + store client.PieceStoreAPI } // NewSyncerService creates a syncer service to upload piece to piece store diff --git a/service/syncer/syncer_service.go b/service/syncer/syncer_service.go index fbcf67fb4..6e9c67e09 100644 --- a/service/syncer/syncer_service.go +++ b/service/syncer/syncer_service.go @@ -10,11 +10,6 @@ import ( "github.com/bnb-chain/inscription-storage-provider/util/log" ) -// UploadECPieceAPI used to mock -type UploadECPieceAPI interface { - UploadECPiece(stream service.SyncerService_UploadECPieceServer) error -} - // UploadECPiece uploads piece data encoded using the ec algorithm to secondary storage provider func (s *Syncer) UploadECPiece(stream service.SyncerService_UploadECPieceServer) (err error) { var req *service.SyncerServiceUploadECPieceRequest From 654267d72dbab099541d0811cd9d9766aab2cef6 Mon Sep 17 00:00:00 2001 From: DylanYong Date: Tue, 10 Jan 2023 19:15:24 +0800 Subject: [PATCH 2/3] fix --- model/errors/errors.go | 13 ++- proto/service/types/v1/stone_hub.proto | 2 +- service/stonenode/helper_test.go | 38 ++++++ service/stonenode/map.json | 50 -------- service/stonenode/sync.go | 154 +++++++++++++------------ service/stonenode/sync_test.go | 131 ++++++++++++++------- 6 files changed, 216 insertions(+), 172 deletions(-) delete mode 100644 service/stonenode/map.json diff --git a/model/errors/errors.go b/model/errors/errors.go index 36b1439f9..270616654 100644 --- a/model/errors/errors.go +++ b/model/errors/errors.go @@ -39,12 +39,13 @@ var ( // stone node service errors var ( - ErrStoneNodeStarted = errors.New("stone node resource is running") - ErrStoneNodeStopped = errors.New("stone node service has stopped") - ErrIntegrityHash = errors.New("secondary integrity hash check error") - ErrRedundancyType = errors.New("unknown redundancy type") - ErrEmptyJob = errors.New("job is empty") - ErrSecondarySPNumber = errors.New("secondary sp is not enough") + ErrStoneNodeStarted = errors.New("stone node resource is running") + ErrStoneNodeStopped = errors.New("stone node service has stopped") + ErrIntegrityHash = errors.New("secondary integrity hash check error") + ErrRedundancyType = errors.New("unknown redundancy type") + ErrEmptyJob = errors.New("job is empty") + ErrSecondarySPNumber = errors.New("secondary sp is not enough") + ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1") ) func MakeErrMsgResponse(err error) *service.ErrMessage { diff --git a/proto/service/types/v1/stone_hub.proto b/proto/service/types/v1/stone_hub.proto index e030163f7..ded946e9f 100644 --- a/proto/service/types/v1/stone_hub.proto +++ b/proto/service/types/v1/stone_hub.proto @@ -43,7 +43,7 @@ message PieceJob { bytes tx_hash = 3; uint64 object_id = 4; uint64 payload_size = 5; - repeated uint32 target_idx = 6; // ec number: 1, 2, 3... + repeated uint32 target_idx = 6; // ec number: 1, 2, 3..., start at 1; segment number: 0, 1, 2..., start at 0 pkg.types.v1.RedundancyType redundancy_type = 7; StorageProviderSealInfo storage_provider_seal_info = 8; } diff --git a/service/stonenode/helper_test.go b/service/stonenode/helper_test.go index 0fecfe8e6..1692d48f9 100644 --- a/service/stonenode/helper_test.go +++ b/service/stonenode/helper_test.go @@ -1,8 +1,11 @@ package stonenode import ( + "context" "testing" + "google.golang.org/grpc" + ptypes "github.com/bnb-chain/inscription-storage-provider/pkg/types/v1" service "github.com/bnb-chain/inscription-storage-provider/service/types/v1" ) @@ -67,3 +70,38 @@ func dispatchInlineMap() map[string][][]byte { iMap["543_s0"] = inlineList return iMap } + +func makeStreamMock() *StreamMock { + return &StreamMock{ + ctx: context.Background(), + recvToServer: make(chan *service.SyncerServiceUploadECPieceRequest, 10), + } +} + +type StreamMock struct { + grpc.ClientStream + ctx context.Context + recvToServer chan *service.SyncerServiceUploadECPieceRequest +} + +func (m *StreamMock) Send(resp *service.SyncerServiceUploadECPieceRequest) error { + m.recvToServer <- resp + return nil +} + +func (m *StreamMock) CloseAndRecv() (*service.SyncerServiceUploadECPieceResponse, error) { + return &service.SyncerServiceUploadECPieceResponse{ + TraceId: "test_traceID", + SecondarySpInfo: &service.StorageProviderSealInfo{ + StorageProviderId: "sp1", + PieceIdx: 1, + PieceChecksum: [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4"), []byte("5"), []byte("6")}, + IntegrityHash: []byte("a"), + Signature: nil, + }, + ErrMessage: &service.ErrMessage{ + ErrCode: service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED, + ErrMsg: "Success", + }, + }, nil +} diff --git a/service/stonenode/map.json b/service/stonenode/map.json deleted file mode 100644 index 6cf524de0..000000000 --- a/service/stonenode/map.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "sp1": { - "123456_s0_p0": "test1", - "123456_s1_p0": "test2", - "123456_s2_p0": "test3", - "123456_s3_p0": "test4", - "123456_s4_p0": "test5", - "123456_s5_p0": "test6" - }, - "sp2": { - "123456_s0_p1": "test7", - "123456_s1_p1": "test8", - "123456_s2_p1": "test9", - "123456_s3_p1": "test10", - "123456_s4_p1": "test11", - "123456_s5_p1": "test12" - }, - "sp3": { - "123456_s0_p2": "test13", - "123456_s1_p2": "test14", - "123456_s2_p2": "test15", - "123456_s3_p2": "test16", - "123456_s4_p2": "test17", - "123456_s5_p2": "test18" - }, - "sp4": { - "123456_s0_p3": "test19", - "123456_s1_p3": "test20", - "123456_s2_p3": "test21", - "123456_s3_p3": "test22", - "123456_s4_p3": "test23", - "123456_s5_p3": "test24" - }, - "sp5": { - "123456_s0_p4": "test25", - "123456_s1_p4": "test26", - "123456_s2_p4": "test27", - "123456_s3_p4": "test28", - "123456_s4_p4": "test29", - "123456_s5_p4": "test30" - }, - "sp6": { - "123456_s0_p5": "test31", - "123456_s1_p5": "test32", - "123456_s2_p5": "test33", - "123456_s3_p5": "test34", - "123456_s4_p5": "test35", - "123456_s5_p5": "test36" - } -} diff --git a/service/stonenode/sync.go b/service/stonenode/sync.go index 59f1f1488..fe7b25472 100644 --- a/service/stonenode/sync.go +++ b/service/stonenode/sync.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "sort" "sync" "sync/atomic" @@ -162,121 +163,128 @@ func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment map[string] secondarySPs []string, targetIdx []uint32) (map[string]map[string][]byte, error) { pieceDataBySecondary := make(map[string]map[string][]byte) + // pieceDataBySegment key is segment key, value is ec data from ec1 to ec6 + var err error switch redundancyType { case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: - + pieceDataBySecondary, err = fillECData(pieceDataBySegment, secondarySPs, targetIdx) + if err != nil { + return map[string]map[string][]byte{}, err + } + return pieceDataBySecondary, nil case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - - default: - return nil, merrors.ErrRedundancyType - } - - // pieceDataBySegment key is segment key, value is ec data from ec1 to ec6 - for pieceKey, pieceData := range pieceDataBySegment { - for idx, data := range pieceData { - if idx >= len(secondarySPs) { - return pieceDataBySecondary, merrors.ErrSecondarySPNumber - } - sp := secondarySPs[idx] - if _, ok := pieceDataBySecondary[sp]; !ok { - pieceDataBySecondary[sp] = make(map[string][]byte) - } - - var ( - err error - key string - ) - // if targetIdx is not equal to zero, retry to get data which idx is equal to targetIdx - if len(targetIdx) != 0 { - for _, j := range targetIdx { - if int(j) == idx { - key, err = joinECKey(pieceKey, redundancyType, idx) - if err != nil { - return nil, err - } - } - } - } else { - key, err = joinECKey(pieceKey, redundancyType, idx) - if err != nil { - return nil, err - } - } - pieceDataBySecondary[sp][key] = data + pieceDataBySecondary, err = fillReplicaOrInlineData(pieceDataBySegment, secondarySPs, targetIdx) + if err != nil { + return map[string]map[string][]byte{}, err } + return pieceDataBySecondary, nil + default: + return map[string]map[string][]byte{}, merrors.ErrRedundancyType } - fmt.Println(pieceDataBySecondary) - return pieceDataBySecondary, nil } func fillECData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) ( map[string]map[string][]byte, error) { ecPieceDataMap := make(map[string]map[string][]byte) - for pieceKey, pieceData := range pieceDataBySegment { + // iterate map in order + keys := sortedKeys(pieceDataBySegment) + for _, pieceKey := range keys { + pieceData := pieceDataBySegment[pieceKey] for idx, data := range pieceData { if idx >= len(secondarySPs) { - return ecPieceDataMap, merrors.ErrSecondarySPNumber + return map[string]map[string][]byte{}, merrors.ErrSecondarySPNumber } + // initialize data map sp := secondarySPs[idx] - if _, ok := ecPieceDataMap[sp]; !ok { - ecPieceDataMap[sp] = make(map[string][]byte) + if len(targetIdx) == 0 { + if _, ok := ecPieceDataMap[sp]; !ok { + ecPieceDataMap[sp] = make(map[string][]byte) + } + } else { + for _, j := range targetIdx { + if int(j-1) == idx { + if _, ok := ecPieceDataMap[sp]; !ok { + ecPieceDataMap[sp] = make(map[string][]byte) + } + } + } } - var ( - err error - key string - ) + var key string // if targetIdx is not equal to zero, retry to get data which idx is equal to targetIdx if len(targetIdx) != 0 { for _, j := range targetIdx { - if int(j) == idx { + if int(j-1) == idx { key = piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, idx) - if err != nil { - return nil, err - } + ecPieceDataMap[sp][key] = data } } } else { key = piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, idx) - if err != nil { - return nil, err - } + ecPieceDataMap[sp][key] = data } - ecPieceDataMap[sp][key] = data } } + fmt.Println(ecPieceDataMap) return ecPieceDataMap, nil } func fillReplicaOrInlineData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) ( map[string]map[string][]byte, error) { replicaOrInlineDataMap := make(map[string]map[string][]byte) - length := len(pieceDataBySegment) + if len(pieceDataBySegment) >= len(secondarySPs) { + return map[string]map[string][]byte{}, merrors.ErrSecondarySPNumber + } - for pieceKey, pieceData := range pieceDataBySegment { - for idx, data := range pieceData { - if idx >= len(secondarySPs) { - return replicaOrInlineDataMap, merrors.ErrSecondarySPNumber - } - sp := secondarySPs[idx] + // iterate map in order + keys := sortedKeys(pieceDataBySegment) + for i := 0; i < len(keys); i++ { + pieceKey := keys[i] + pieceData := pieceDataBySegment[pieceKey] + if len(pieceData) != 1 { + return nil, merrors.ErrInvalidSegmentData + } + + sp := secondarySPs[i] + if len(targetIdx) == 0 { if _, ok := replicaOrInlineDataMap[sp]; !ok { replicaOrInlineDataMap[sp] = make(map[string][]byte) } + } else { + for _, index := range targetIdx { + if int(index) == i { + if _, ok := replicaOrInlineDataMap[sp]; !ok { + replicaOrInlineDataMap[sp] = make(map[string][]byte) + } + } + } + } + + var key string + if len(targetIdx) != 0 { + for _, index := range targetIdx { + if int(index) == i { + key = pieceKey + replicaOrInlineDataMap[sp][key] = pieceData[0] + } + } + } else { + key = pieceKey + replicaOrInlineDataMap[sp][key] = pieceData[0] } } + fmt.Println(replicaOrInlineDataMap) + return replicaOrInlineDataMap, nil } -// joinECKey -func joinECKey(pieceKey string, redundancyType ptypes.RedundancyType, index int) (string, error) { - switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: - return piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, index), nil - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - return pieceKey, nil - default: - return "", merrors.ErrRedundancyType +func sortedKeys(dataMap map[string][][]byte) []string { + keys := make([]string, 0, len(dataMap)) + for k := range dataMap { + keys = append(keys, k) } + sort.Strings(keys) + return keys } // doSyncToSecondarySP send piece data to the secondary. @@ -338,7 +346,7 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *ser integrityHash := hash.GenerateIntegrityHash(pieceHash, secondary) if syncResp.GetSecondarySpInfo() == nil || syncResp.GetSecondarySpInfo().GetIntegrityHash() == nil || - bytes.Equal(integrityHash, syncResp.GetSecondarySpInfo().GetIntegrityHash()) { + !bytes.Equal(integrityHash, syncResp.GetSecondarySpInfo().GetIntegrityHash()) { log.CtxErrorw(ctx, "secondary integrity hash check error") errMsg.ErrCode = service.ErrCode_ERR_CODE_ERROR errMsg.ErrMsg = merrors.ErrIntegrityHash.Error() diff --git a/service/stonenode/sync_test.go b/service/stonenode/sync_test.go index c07b06af2..f52297f9b 100644 --- a/service/stonenode/sync_test.go +++ b/service/stonenode/sync_test.go @@ -216,7 +216,7 @@ func Test_dispatchSecondarySP(t *testing.T) { req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, req3: spList, req4: []uint32{}, - wantedResult1: 2, + wantedResult1: 3, wantedErr: nil, }, { @@ -229,7 +229,7 @@ func Test_dispatchSecondarySP(t *testing.T) { wantedErr: nil, }, { - name: "4", + name: "ec type data retransmission", req1: dispatchPieceMap(), req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, req3: spList, @@ -237,6 +237,15 @@ func Test_dispatchSecondarySP(t *testing.T) { wantedResult1: 2, wantedErr: nil, }, + { + name: "replica type data retransmission", + req1: dispatchSegmentMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{1, 2}, + wantedResult1: 2, + wantedErr: nil, + }, { name: "unknown redundancy type", req1: dispatchPieceMap(), @@ -255,6 +264,15 @@ func Test_dispatchSecondarySP(t *testing.T) { wantedResult1: 0, wantedErr: merrors.ErrSecondarySPNumber, }, + { + name: "wrong replica/inline segment data length", + req1: dispatchPieceMap(), + req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{}, + wantedResult1: 0, + wantedErr: merrors.ErrInvalidSegmentData, + }, } node := setup(t) @@ -267,61 +285,90 @@ func Test_dispatchSecondarySP(t *testing.T) { } } -func TestUploadECPiece(t *testing.T) { +// TODO:need improved +func Test_doSyncToSecondarySP(t *testing.T) { + data := map[string]map[string][]byte{ + "sp1": { + "123456_s0_p0": []byte("test1"), + "123456_s1_p0": []byte("test2"), + "123456_s2_p0": []byte("test3"), + "123456_s3_p0": []byte("test4"), + "123456_s4_p0": []byte("test5"), + "123456_s5_p0": []byte("test6"), + }, + } + cases := []struct { + name string + req1 *service.StoneHubServiceAllocStoneJobResponse + req2 map[string]map[string][]byte + }{ + { + name: "1", + req1: nil, + req2: data, + }, + } + node := setup(t) ctrl := gomock.NewController(t) defer ctrl.Finish() + // stoneHub service stub + //stoneHub := mock.NewMockStoneHubAPI(ctrl) + //node.stoneHub = stoneHub + //stoneHub.EXPECT().DoneSecondaryPieceJob(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + // func(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) ( + // *service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { + // return nil, nil + // }) + + // syncer service stub + streamClient := makeStreamMock() syncer := mock.NewMockSyncerAPI(ctrl) node.syncer = syncer syncer.EXPECT().UploadECPiece(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_UploadECPieceClient, error) { - return nil, nil + return streamClient, nil }).AnyTimes() -} - -func makeStreamMock() *StreamMock { - return &StreamMock{ - ctx: context.Background(), - recvToServer: make(chan *service.SyncerServiceUploadECPieceRequest, 10), - sentFromServer: make(chan *service.SyncerServiceUploadECPieceResponse, 10), + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + allocResp := mockAllocResp(123456, 20*1024*1024, ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) + err := node.doSyncToSecondarySP(context.TODO(), allocResp, tt.req2) + assert.Equal(t, nil, err) + }) } } -type StreamMock struct { - grpc.ServerStream - ctx context.Context - recvToServer chan *service.SyncerServiceUploadECPieceRequest - sentFromServer chan *service.SyncerServiceUploadECPieceResponse -} - -func (m *StreamMock) Context() context.Context { - return m.ctx -} +func TestUploadECPieceSuccess(t *testing.T) { + node := setup(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() -func (m *StreamMock) Send(resp *service.SyncerServiceUploadECPieceResponse) error { - m.sentFromServer <- resp - return nil -} + streamClient := makeStreamMock() + syncer := mock.NewMockSyncerAPI(ctrl) + node.syncer = syncer + syncer.EXPECT().UploadECPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_UploadECPieceClient, error) { + return streamClient, nil + }).AnyTimes() -func (m *StreamMock) Recv() (*service.SyncerServiceUploadECPieceRequest, error) { - req, more := <-m.recvToServer - if !more { - return nil, errors.New("empty") + sInfo := &service.SyncerInfo{ + ObjectId: 123456, + TxHash: []byte("i"), + StorageProviderId: "sp1", + RedundancyType: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, } - return req, nil -} - -func (m *StreamMock) SendFromClient(req *service.SyncerServiceUploadECPieceRequest) error { - m.recvToServer <- req - return nil -} - -func (m *StreamMock) RecvToClient() (*service.SyncerServiceUploadECPieceResponse, error) { - response, more := <-m.sentFromServer - if !more { - return nil, errors.New("empty") + data := map[string][]byte{ + "123456_s0_p0": []byte("test1"), + "123456_s1_p0": []byte("test2"), + "123456_s2_p0": []byte("test3"), + "123456_s3_p0": []byte("test4"), + "123456_s4_p0": []byte("test5"), + "123456_s5_p0": []byte("test6"), } - return response, nil + resp, err := node.UploadECPiece(context.TODO(), 3, sInfo, data, "test_traceID") + assert.Equal(t, err, nil) + assert.Equal(t, resp.GetTraceId(), "test_traceID") + assert.Equal(t, resp.GetSecondarySpInfo().GetPieceIdx(), uint32(1)) } From 6cf722224b780d411a7f158fd3a7ec0d48364fc0 Mon Sep 17 00:00:00 2001 From: DylanYong Date: Tue, 10 Jan 2023 19:24:40 +0800 Subject: [PATCH 3/3] fix --- service/stonenode/sync_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/service/stonenode/sync_test.go b/service/stonenode/sync_test.go index f52297f9b..c3989c7cd 100644 --- a/service/stonenode/sync_test.go +++ b/service/stonenode/sync_test.go @@ -314,13 +314,13 @@ func Test_doSyncToSecondarySP(t *testing.T) { defer ctrl.Finish() // stoneHub service stub - //stoneHub := mock.NewMockStoneHubAPI(ctrl) - //node.stoneHub = stoneHub - //stoneHub.EXPECT().DoneSecondaryPieceJob(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - // func(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) ( - // *service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { - // return nil, nil - // }) + stoneHub := mock.NewMockStoneHubAPI(ctrl) + node.stoneHub = stoneHub + stoneHub.EXPECT().DoneSecondaryPieceJob(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) ( + *service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { + return nil, nil + }) // syncer service stub streamClient := makeStreamMock()