Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stone_node): supports sending data to different storage provider #82

Merged
merged 8 commits into from
Feb 9, 2023
1 change: 1 addition & 0 deletions model/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1")
ErrInvalidECData = errors.New("invalid ec data, length is not equal to 6")
ErrEmptyTargetIdx = errors.New("target index array is empty")
ErrSyncerNumber = errors.New("syncer number is less than piece count")
)

// syncer service errors
Expand Down
2 changes: 1 addition & 1 deletion service/challenge/challenge_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func DefaultStorageProviderID() string {
}

var DefaultChallengeConfig = &ChallengeConfig{
Address: "127.0.0.1:5423",
Address: "127.0.0.1:9633",
StorageProvider: DefaultStorageProviderID(),
MetaDBType: model.LevelDB,
MetaLevelDBConfig: metalevel.DefaultMetaLevelDBConfig,
Expand Down
2 changes: 1 addition & 1 deletion service/downloader/downloader_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ type DownloaderConfig struct {
}

var DefaultDownloaderConfig = &DownloaderConfig{
Address: "127.0.0.1:5523",
Address: "127.0.0.1:9233",
PieceStoreConfig: storage.DefaultPieceStoreConfig,
}
6 changes: 3 additions & 3 deletions service/gateway/gateway_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ type GatewayConfig struct {
}

var DefaultGatewayConfig = &GatewayConfig{
Address: "127.0.0.1:5310",
Address: "127.0.0.1:9033",
Domain: "bfs.nodereal.com",
UploaderServiceAddress: "127.0.0.1:5311",
DownloaderServiceAddress: "127.0.0.1:5523",
UploaderServiceAddress: "127.0.0.1:9133",
DownloaderServiceAddress: "127.0.0.1:9233",
ChainConfig: defaultChainClientConfig,
}
2 changes: 1 addition & 1 deletion service/stonehub/stone_hub_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func DefaultStorageProviderID() string {

var DefaultStoneHubConfig = &StoneHubConfig{
StorageProvider: DefaultStorageProviderID(),
Address: "127.0.0.1:5323",
Address: "127.0.0.1:9333",
JobDBType: model.MemoryDB,
JobDB: jobsql.DefaultJobSqlDBConfig,
MetaDBType: model.LevelDB,
Expand Down
3 changes: 1 addition & 2 deletions service/stonenode/encode_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ func (node *StoneNodeService) encodeSegmentsData(ctx context.Context, allocResp
interruptCh = make(chan struct{})
pieces = make(map[int][][]byte)
objectID = allocResp.GetPieceJob().GetObjectId()
payloadSize = allocResp.GetPieceJob().GetPayloadSize()
redundancyType = allocResp.GetPieceJob().GetRedundancyType()
segmentCount = util.ComputeSegmentCount(payloadSize)
segmentCount = util.ComputeSegmentCount(allocResp.GetPieceJob().GetPayloadSize())
)

loadFunc := func(ctx context.Context, seg *segment) error {
Expand Down
2 changes: 1 addition & 1 deletion service/stonenode/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func setup(t *testing.T) *StoneNodeService {
cfg: &StoneNodeConfig{
Address: "test1",
StoneHubServiceAddress: "test2",
SyncerServiceAddress: "test3",
SyncerServiceAddress: []string{"test3"},
StorageProvider: "test",
StoneJobLimit: 0,
},
Expand Down
20 changes: 12 additions & 8 deletions service/stonenode/server.go → service/stonenode/stone_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
type StoneNodeService struct {
cfg *StoneNodeConfig
name string
syncer client.SyncerAPI
syncer []client.SyncerAPI
stoneHub client.StoneHubAPI
store client.PieceStoreAPI
stoneLimit int64
Expand Down Expand Up @@ -58,14 +58,16 @@ func (node *StoneNodeService) initClient() error {
log.Errorw("stone node inits stone hub client failed", "error", err)
return err
}
syncer, err := client.NewSyncerClient(node.cfg.SyncerServiceAddress)
if err != nil {
log.Errorw("stone node inits syncer client failed", "error", err)
return err
for _, value := range node.cfg.SyncerServiceAddress {
syncer, err := client.NewSyncerClient(value)
if err != nil {
log.Errorw("stone node inits syncer client failed", "error", err)
return err
}
node.syncer = append(node.syncer, syncer)
}
node.store = store
node.stoneHub = stoneHub
node.syncer = syncer
return nil
}

Expand Down Expand Up @@ -124,8 +126,10 @@ func (node *StoneNodeService) Stop(ctx context.Context) error {
if err := node.stoneHub.Close(); err != nil {
errs = append(errs, err)
}
if err := node.syncer.Close(); err != nil {
errs = append(errs, err)
for _, syncer := range node.syncer {
if err := syncer.Close(); err != nil {
errs = append(errs, err)
}
}
if errs != nil {
return fmt.Errorf("%v", errs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storag
type StoneNodeConfig struct {
Address string
StoneHubServiceAddress string
SyncerServiceAddress string
SyncerServiceAddress []string
StorageProvider string
PieceConfig *storage.PieceStoreConfig
StoneJobLimit int64
}

var DefaultStoneNodeConfig = &StoneNodeConfig{
Address: "127.0.0.1:5325",
StoneHubServiceAddress: "127.0.0.1:5323",
SyncerServiceAddress: "127.0.0.1:5324",
Address: "127.0.0.1:9433",
StoneHubServiceAddress: "127.0.0.1:9333",
SyncerServiceAddress: []string{"127.0.0.1:9533", "127.0.0.1:9543", "127.0.0.1:9553", "127.0.0.1:9563", "127.0.0.1:9573", "127.0.0.1:9583"},
StorageProvider: "bnb-sp",
PieceConfig: storage.DefaultPieceStoreConfig,
StoneJobLimit: 64,
Expand Down
9 changes: 6 additions & 3 deletions service/stonenode/sync_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *sty
PieceIndex: uint32(index),
PieceCount: uint32(len(pieceData)),
RedundancyType: redundancyType,
}, pieceData, resp.GetTraceId())
}, pieceData, index, resp.GetTraceId())
// TBD:: retry alloc secondary sp and rat again.
if err != nil {
log.CtxErrorw(ctx, "sync to secondary piece job failed", "error", err)
Expand Down Expand Up @@ -96,8 +96,11 @@ func verifyIntegrityHash(pieceData [][]byte, spInfo *stypes.StorageProviderSealI

// syncPiece send rpc request to secondary storage provider to sync the piece data
func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *stypes.SyncerInfo,
pieceData [][]byte, traceID string) (*stypes.SyncerServiceSyncPieceResponse, error) {
stream, err := node.syncer.SyncPiece(ctx)
pieceData [][]byte, index int, traceID string) (*stypes.SyncerServiceSyncPieceResponse, error) {
if index > len(node.syncer) {
return nil, merrors.ErrSyncerNumber
}
stream, err := node.syncer[index].SyncPiece(ctx)
if err != nil {
log.Errorw("sync secondary piece job error", "err", err)
return nil, err
Expand Down
45 changes: 40 additions & 5 deletions service/stonenode/sync_piece_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,44 @@ func Test_doSyncToSecondarySP(t *testing.T) {

// syncer service stub
streamClient := makeStreamMock()
syncer := mock.NewMockSyncerAPI(ctrl)
node.syncer = syncer
syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn(
syncer1 := mock.NewMockSyncerAPI(ctrl)
node.syncer = append(node.syncer, syncer1)
syncer1.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) {
return streamClient, nil
}).AnyTimes()

syncer2 := mock.NewMockSyncerAPI(ctrl)
node.syncer = append(node.syncer, syncer2)
syncer2.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) {
return streamClient, nil
}).AnyTimes()

syncer3 := mock.NewMockSyncerAPI(ctrl)
node.syncer = append(node.syncer, syncer3)
syncer3.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) {
return streamClient, nil
}).AnyTimes()

syncer4 := mock.NewMockSyncerAPI(ctrl)
node.syncer = append(node.syncer, syncer4)
syncer4.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) {
return streamClient, nil
}).AnyTimes()

syncer5 := mock.NewMockSyncerAPI(ctrl)
node.syncer = append(node.syncer, syncer5)
syncer5.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) {
return streamClient, nil
}).AnyTimes()

syncer6 := mock.NewMockSyncerAPI(ctrl)
node.syncer = append(node.syncer, syncer6)
syncer6.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) {
return streamClient, nil
}).AnyTimes()
Expand All @@ -66,7 +101,7 @@ func TestSyncPieceSuccess(t *testing.T) {

streamClient := makeStreamMock()
syncer := mock.NewMockSyncerAPI(ctrl)
node.syncer = syncer
node.syncer = append(node.syncer, syncer)
syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) {
return streamClient, nil
Expand All @@ -85,7 +120,7 @@ func TestSyncPieceSuccess(t *testing.T) {
[]byte("test5"),
[]byte("test6"),
}
resp, err := node.syncPiece(context.TODO(), sInfo, data, "test_traceID")
resp, err := node.syncPiece(context.TODO(), sInfo, data, 0, "test_traceID")
assert.Equal(t, err, nil)
assert.Equal(t, resp.GetTraceId(), "test_traceID")
assert.Equal(t, resp.GetSecondarySpInfo().GetPieceIdx(), uint32(1))
Expand Down
2 changes: 1 addition & 1 deletion service/syncer/syncer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type SyncerConfig struct {
}

var DefaultSyncerConfig = &SyncerConfig{
Address: "127.0.0.1:5324",
Address: "127.0.0.1:9533",
StorageProvider: "bnb-sp",
MetaDBType: model.LevelDB,
MetaLevelDBConfig: metalevel.DefaultMetaLevelDBConfig,
Expand Down
11 changes: 5 additions & 6 deletions util/maps/map.go → util/maps/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ func sortSlice[T constraints.Ordered](s []T) {
})
}

// ValueToSlice convert values of a map to a slice
// ValueToSlice convert values of a map to a slice in order
func ValueToSlice[M ~map[K]V, K constraints.Ordered, V any](m M) []V {
keys := SortKeys(m)
valueSlice := make([]V, 0)
for _, key := range keys {
value := m[key]
valueSlice = append(valueSlice, value)
s := make([]V, len(m))
for index, key := range keys {
s[index] = m[key]
}
return valueSlice
return s
}
3 changes: 2 additions & 1 deletion util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"strings"
"unicode"

"github.com/naoina/toml"

"github.com/bnb-chain/greenfield-storage-provider/model"
merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1"
"github.com/naoina/toml"
)

// TomlSettings - These settings ensure that TOML keys use the same names as Go struct fields.
Expand Down