diff --git a/config/config.toml b/config/config.toml index 33ffd5769..00a5ee7f8 100644 --- a/config/config.toml +++ b/config/config.toml @@ -34,6 +34,7 @@ Service = [ FileHandles = 1000 ReadOnly = false + [DownloaderCfg] Address = "127.0.0.1:9233" [DownloaderCfg.PieceStoreConfig] @@ -62,11 +63,12 @@ Service = [ FileHandles = 1000 ReadOnly = false + [StoneNodeCfg] StorageProvider = "bnb-sp" Address = "127.0.0.1:9433" StoneHubServiceAddress = "127.0.0.1:9333" - SyncerServiceAddress = "127.0.0.1:9533" + SyncerServiceAddress = ["127.0.0.1:9533", "127.0.0.1:9543", "127.0.0.1:9553", "127.0.0.1:9563", "127.0.0.1:9573", "127.0.0.1:9583"] StoneJobLimit = 64 [StoneNodeCfg.PieceConfig] Shards = 0 diff --git a/model/errors/errors.go b/model/errors/errors.go index f5528c409..4555293af 100644 --- a/model/errors/errors.go +++ b/model/errors/errors.go @@ -66,6 +66,7 @@ var ( ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1") ErrInvalidECData = errors.New("invalid ec data, length is not equal to 6") ErrEmptyTargetIdx = errors.New("target index array is empty") + ErrSyncerNumber = errors.New("syncer number is less than piece count") ) // syncer service errors diff --git a/service/challenge/challenge_config.go b/service/challenge/challenge_config.go index 2bd0d2bb5..7d4044f7d 100644 --- a/service/challenge/challenge_config.go +++ b/service/challenge/challenge_config.go @@ -29,7 +29,7 @@ func DefaultStorageProviderID() string { } var DefaultChallengeConfig = &ChallengeConfig{ - Address: "127.0.0.1:5423", + Address: "127.0.0.1:9633", StorageProvider: DefaultStorageProviderID(), MetaDBType: model.LevelDB, MetaLevelDBConfig: metalevel.DefaultMetaLevelDBConfig, diff --git a/service/downloader/downloader_config.go b/service/downloader/downloader_config.go index 8d94581f0..f998806eb 100644 --- a/service/downloader/downloader_config.go +++ b/service/downloader/downloader_config.go @@ -8,6 +8,6 @@ type DownloaderConfig struct { } var DefaultDownloaderConfig = &DownloaderConfig{ - Address: "127.0.0.1:5523", + Address: "127.0.0.1:9233", PieceStoreConfig: storage.DefaultPieceStoreConfig, } diff --git a/service/gateway/gateway_config.go b/service/gateway/gateway_config.go index c5af7582c..039dcbccc 100644 --- a/service/gateway/gateway_config.go +++ b/service/gateway/gateway_config.go @@ -9,9 +9,9 @@ type GatewayConfig struct { } var DefaultGatewayConfig = &GatewayConfig{ - Address: "127.0.0.1:5310", + Address: "127.0.0.1:9033", Domain: "bfs.nodereal.com", - UploaderServiceAddress: "127.0.0.1:5311", - DownloaderServiceAddress: "127.0.0.1:5523", + UploaderServiceAddress: "127.0.0.1:9133", + DownloaderServiceAddress: "127.0.0.1:9233", ChainConfig: defaultChainClientConfig, } diff --git a/service/stonehub/stone_hub_config.go b/service/stonehub/stone_hub_config.go index 71a969cc2..6442b1387 100644 --- a/service/stonehub/stone_hub_config.go +++ b/service/stonehub/stone_hub_config.go @@ -31,7 +31,7 @@ func DefaultStorageProviderID() string { var DefaultStoneHubConfig = &StoneHubConfig{ StorageProvider: DefaultStorageProviderID(), - Address: "127.0.0.1:5323", + Address: "127.0.0.1:9333", JobDBType: model.MemoryDB, JobDB: jobsql.DefaultJobSqlDBConfig, MetaDBType: model.LevelDB, diff --git a/service/stonenode/encode_segment.go b/service/stonenode/encode_segment.go index 46c7728c8..706912195 100644 --- a/service/stonenode/encode_segment.go +++ b/service/stonenode/encode_segment.go @@ -33,9 +33,8 @@ func (node *StoneNodeService) encodeSegmentsData(ctx context.Context, allocResp interruptCh = make(chan struct{}) pieces = make(map[int][][]byte) objectID = allocResp.GetPieceJob().GetObjectId() - payloadSize = allocResp.GetPieceJob().GetPayloadSize() redundancyType = allocResp.GetPieceJob().GetRedundancyType() - segmentCount = util.ComputeSegmentCount(payloadSize) + segmentCount = util.ComputeSegmentCount(allocResp.GetPieceJob().GetPayloadSize()) ) loadFunc := func(ctx context.Context, seg *segment) error { diff --git a/service/stonenode/helper_test.go b/service/stonenode/helper_test.go index 9707139eb..413bdf7e1 100644 --- a/service/stonenode/helper_test.go +++ b/service/stonenode/helper_test.go @@ -16,7 +16,7 @@ func setup(t *testing.T) *StoneNodeService { cfg: &StoneNodeConfig{ Address: "test1", StoneHubServiceAddress: "test2", - SyncerServiceAddress: "test3", + SyncerServiceAddress: []string{"test3"}, StorageProvider: "test", StoneJobLimit: 0, }, diff --git a/service/stonenode/server.go b/service/stonenode/stone_node.go similarity index 89% rename from service/stonenode/server.go rename to service/stonenode/stone_node.go index 4acdba041..a27bb25cd 100644 --- a/service/stonenode/server.go +++ b/service/stonenode/stone_node.go @@ -20,7 +20,7 @@ const ( type StoneNodeService struct { cfg *StoneNodeConfig name string - syncer client.SyncerAPI + syncer []client.SyncerAPI stoneHub client.StoneHubAPI store client.PieceStoreAPI stoneLimit int64 @@ -58,14 +58,16 @@ func (node *StoneNodeService) initClient() error { log.Errorw("stone node inits stone hub client failed", "error", err) return err } - syncer, err := client.NewSyncerClient(node.cfg.SyncerServiceAddress) - if err != nil { - log.Errorw("stone node inits syncer client failed", "error", err) - return err + for _, value := range node.cfg.SyncerServiceAddress { + syncer, err := client.NewSyncerClient(value) + if err != nil { + log.Errorw("stone node inits syncer client failed", "error", err) + return err + } + node.syncer = append(node.syncer, syncer) } node.store = store node.stoneHub = stoneHub - node.syncer = syncer return nil } @@ -124,8 +126,10 @@ func (node *StoneNodeService) Stop(ctx context.Context) error { if err := node.stoneHub.Close(); err != nil { errs = append(errs, err) } - if err := node.syncer.Close(); err != nil { - errs = append(errs, err) + for _, syncer := range node.syncer { + if err := syncer.Close(); err != nil { + errs = append(errs, err) + } } if errs != nil { return fmt.Errorf("%v", errs) diff --git a/service/stonenode/config.go b/service/stonenode/stone_node_config.go similarity index 64% rename from service/stonenode/config.go rename to service/stonenode/stone_node_config.go index 51f1d1464..d62a661d4 100644 --- a/service/stonenode/config.go +++ b/service/stonenode/stone_node_config.go @@ -5,16 +5,16 @@ import "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storag type StoneNodeConfig struct { Address string StoneHubServiceAddress string - SyncerServiceAddress string + SyncerServiceAddress []string StorageProvider string PieceConfig *storage.PieceStoreConfig StoneJobLimit int64 } var DefaultStoneNodeConfig = &StoneNodeConfig{ - Address: "127.0.0.1:5325", - StoneHubServiceAddress: "127.0.0.1:5323", - SyncerServiceAddress: "127.0.0.1:5324", + Address: "127.0.0.1:9433", + StoneHubServiceAddress: "127.0.0.1:9333", + SyncerServiceAddress: []string{"127.0.0.1:9533", "127.0.0.1:9543", "127.0.0.1:9553", "127.0.0.1:9563", "127.0.0.1:9573", "127.0.0.1:9583"}, StorageProvider: "bnb-sp", PieceConfig: storage.DefaultPieceStoreConfig, StoneJobLimit: 64, diff --git a/service/stonenode/sync_piece.go b/service/stonenode/sync_piece.go index cf0f19d4c..a33dbf6f3 100644 --- a/service/stonenode/sync_piece.go +++ b/service/stonenode/sync_piece.go @@ -52,7 +52,7 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *sty PieceIndex: uint32(index), PieceCount: uint32(len(pieceData)), RedundancyType: redundancyType, - }, pieceData, resp.GetTraceId()) + }, pieceData, index, resp.GetTraceId()) // TBD:: retry alloc secondary sp and rat again. if err != nil { log.CtxErrorw(ctx, "sync to secondary piece job failed", "error", err) @@ -96,8 +96,11 @@ func verifyIntegrityHash(pieceData [][]byte, spInfo *stypes.StorageProviderSealI // syncPiece send rpc request to secondary storage provider to sync the piece data func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *stypes.SyncerInfo, - pieceData [][]byte, traceID string) (*stypes.SyncerServiceSyncPieceResponse, error) { - stream, err := node.syncer.SyncPiece(ctx) + pieceData [][]byte, index int, traceID string) (*stypes.SyncerServiceSyncPieceResponse, error) { + if index > len(node.syncer) { + return nil, merrors.ErrSyncerNumber + } + stream, err := node.syncer[index].SyncPiece(ctx) if err != nil { log.Errorw("sync secondary piece job error", "err", err) return nil, err diff --git a/service/stonenode/sync_piece_test.go b/service/stonenode/sync_piece_test.go index 0a8beacaf..86a19c2ae 100644 --- a/service/stonenode/sync_piece_test.go +++ b/service/stonenode/sync_piece_test.go @@ -43,9 +43,44 @@ func Test_doSyncToSecondarySP(t *testing.T) { // syncer service stub streamClient := makeStreamMock() - syncer := mock.NewMockSyncerAPI(ctrl) - node.syncer = syncer - syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( + syncer1 := mock.NewMockSyncerAPI(ctrl) + node.syncer = append(node.syncer, syncer1) + syncer1.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + syncer2 := mock.NewMockSyncerAPI(ctrl) + node.syncer = append(node.syncer, syncer2) + syncer2.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + syncer3 := mock.NewMockSyncerAPI(ctrl) + node.syncer = append(node.syncer, syncer3) + syncer3.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + syncer4 := mock.NewMockSyncerAPI(ctrl) + node.syncer = append(node.syncer, syncer4) + syncer4.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + syncer5 := mock.NewMockSyncerAPI(ctrl) + node.syncer = append(node.syncer, syncer5) + syncer5.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + syncer6 := mock.NewMockSyncerAPI(ctrl) + node.syncer = append(node.syncer, syncer6) + syncer6.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { return streamClient, nil }).AnyTimes() @@ -66,7 +101,7 @@ func TestSyncPieceSuccess(t *testing.T) { streamClient := makeStreamMock() syncer := mock.NewMockSyncerAPI(ctrl) - node.syncer = syncer + node.syncer = append(node.syncer, syncer) syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { return streamClient, nil @@ -85,7 +120,7 @@ func TestSyncPieceSuccess(t *testing.T) { []byte("test5"), []byte("test6"), } - resp, err := node.syncPiece(context.TODO(), sInfo, data, "test_traceID") + resp, err := node.syncPiece(context.TODO(), sInfo, data, 0, "test_traceID") assert.Equal(t, err, nil) assert.Equal(t, resp.GetTraceId(), "test_traceID") assert.Equal(t, resp.GetSecondarySpInfo().GetPieceIdx(), uint32(1)) diff --git a/service/syncer/syncer_config.go b/service/syncer/syncer_config.go index 0dc1186d3..75a538fd3 100644 --- a/service/syncer/syncer_config.go +++ b/service/syncer/syncer_config.go @@ -18,7 +18,7 @@ type SyncerConfig struct { } var DefaultSyncerConfig = &SyncerConfig{ - Address: "127.0.0.1:5324", + Address: "127.0.0.1:9533", StorageProvider: "bnb-sp", MetaDBType: model.LevelDB, MetaLevelDBConfig: metalevel.DefaultMetaLevelDBConfig, diff --git a/util/maps/map.go b/util/maps/maps.go similarity index 72% rename from util/maps/map.go rename to util/maps/maps.go index 53e63b9a2..ffff37ff7 100644 --- a/util/maps/map.go +++ b/util/maps/maps.go @@ -22,13 +22,12 @@ func sortSlice[T constraints.Ordered](s []T) { }) } -// ValueToSlice convert values of a map to a slice +// ValueToSlice convert values of a map to a slice in order func ValueToSlice[M ~map[K]V, K constraints.Ordered, V any](m M) []V { keys := SortKeys(m) - valueSlice := make([]V, 0) - for _, key := range keys { - value := m[key] - valueSlice = append(valueSlice, value) + s := make([]V, len(m)) + for i, k := range keys { + s[i] = m[k] } - return valueSlice + return s } diff --git a/util/utils.go b/util/utils.go index bf581555d..86a4307e9 100644 --- a/util/utils.go +++ b/util/utils.go @@ -9,10 +9,11 @@ import ( "strings" "unicode" + "github.com/naoina/toml" + "github.com/bnb-chain/greenfield-storage-provider/model" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - "github.com/naoina/toml" ) // TomlSettings - These settings ensure that TOML keys use the same names as Go struct fields.