diff --git a/cmd/storage_provider/main.go b/cmd/storage_provider/main.go index ce36c27be..de081fe3d 100644 --- a/cmd/storage_provider/main.go +++ b/cmd/storage_provider/main.go @@ -84,6 +84,9 @@ func initService(serviceName string, cfg *config.StorageProviderConfig) (server if err != nil { return nil, err } + default: + log.Errorw("unknown service", "service", serviceName) + return nil, fmt.Errorf("unknow service: %s", serviceName) } return server, nil } diff --git a/config/config.toml b/config/config.toml index b3ff8d2ac..b34d72523 100644 --- a/config/config.toml +++ b/config/config.toml @@ -91,7 +91,7 @@ StoneJobLimit = 64 [SyncerCfg] StorageProvider = "bnb-sp" -Address = "127.0.0.1:11600" +Address = "127.0.0.1:9533" MetaDBType = "leveldb" [SyncerCfg.PieceConfig.Store] Storage = "file" diff --git a/model/errors/errors.go b/model/errors/errors.go index f77867fd5..ddac16ac8 100644 --- a/model/errors/errors.go +++ b/model/errors/errors.go @@ -54,7 +54,7 @@ var ( ErrStoneNodeStopped = errors.New("stone node service has stopped") ErrIntegrityHash = errors.New("secondary integrity hash check error") ErrRedundancyType = errors.New("unknown redundancy type") - ErrEmptyJob = errors.New("job is empty") + ErrEmptyJob = errors.New("alloc stone job is empty") ErrSecondarySPNumber = errors.New("secondary sp is not enough") ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1") ErrInvalidECData = errors.New("invalid ec data, length is not equal to 6") @@ -63,6 +63,8 @@ var ( // syncer service errors var ( + ErrSyncerStarted = errors.New("syncer service is running") + ErrSyncerStopped = errors.New("syncer service has already stopped") ErrReceivedPieceCount = errors.New("syncer service received piece count is wrong") ) diff --git a/pkg/job/object_context.go b/pkg/job/object_context.go index 0cb35c632..016f6d1ba 100644 --- a/pkg/job/object_context.go +++ b/pkg/job/object_context.go @@ -3,21 +3,21 @@ package job import ( "sync" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" ) // ObjectInfoContext maintains the object info, goroutine safe. type ObjectInfoContext struct { - object *types.ObjectInfo + object *ptypesv1pb.ObjectInfo jobDB jobdb.JobDB metaDB metadb.MetaDB mu sync.RWMutex } // NewObjectInfoContext return the instance of ObjectInfoContext. -func NewObjectInfoContext(object *types.ObjectInfo, jobDB jobdb.JobDB, metaDB metadb.MetaDB) *ObjectInfoContext { +func NewObjectInfoContext(object *ptypesv1pb.ObjectInfo, jobDB jobdb.JobDB, metaDB metadb.MetaDB) *ObjectInfoContext { return &ObjectInfoContext{ object: object, jobDB: jobDB, @@ -26,7 +26,7 @@ func NewObjectInfoContext(object *types.ObjectInfo, jobDB jobdb.JobDB, metaDB me } // GetObjectInfo return the object info. -func (ctx *ObjectInfoContext) GetObjectInfo() *types.ObjectInfo { +func (ctx *ObjectInfoContext) GetObjectInfo() *ptypesv1pb.ObjectInfo { ctx.mu.RLock() defer ctx.mu.RUnlock() return ctx.object.SafeCopy() @@ -47,7 +47,7 @@ func (ctx *ObjectInfoContext) GetObjectSize() uint64 { } // GetObjectRedundancyType return the object redundancy type. -func (ctx *ObjectInfoContext) GetObjectRedundancyType() types.RedundancyType { +func (ctx *ObjectInfoContext) GetObjectRedundancyType() ptypesv1pb.RedundancyType { ctx.mu.RLock() defer ctx.mu.RUnlock() return ctx.object.GetRedundancyType() diff --git a/pkg/job/upload_job_test.go b/pkg/job/upload_job_test.go index 4b74d5ce2..61d4d0fb3 100644 --- a/pkg/job/upload_job_test.go +++ b/pkg/job/upload_job_test.go @@ -7,19 +7,19 @@ import ( "github.com/stretchr/testify/assert" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb/jobmemory" "github.com/bnb-chain/greenfield-storage-provider/util/hash" ) -func InitEnv(rType types.RedundancyType) (*UploadPayloadJob, *types.ObjectInfo) { +func InitEnv(rType ptypesv1pb.RedundancyType) (*UploadPayloadJob, *ptypesv1pb.ObjectInfo) { objectSize := 50 * 1024 * 1024 switch rType { - case types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: objectSize = 1 * 1024 * 1024 } - object := &types.ObjectInfo{ + object := &ptypesv1pb.ObjectInfo{ Size: uint64(objectSize), ObjectId: 1, RedundancyType: rType, @@ -29,27 +29,27 @@ func InitEnv(rType types.RedundancyType) (*UploadPayloadJob, *types.ObjectInfo) } func TestInitUploadPayloadJob(t *testing.T) { - job, _ := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) + job, _ := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) assert.Equal(t, len(job.primaryJob.pieceJobs), 4) assert.Equal(t, len(job.secondaryJob.pieceJobs), 6) - job, _ = InitEnv(types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE) + job, _ = InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE) assert.Equal(t, len(job.primaryJob.pieceJobs), 1) assert.Equal(t, len(job.secondaryJob.pieceJobs), 1) - job, _ = InitEnv(types.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE) + job, _ = InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE) assert.Equal(t, len(job.primaryJob.pieceJobs), 4) assert.Equal(t, len(job.secondaryJob.pieceJobs), 4) } func TestDoneReplicatePieceJob(t *testing.T) { - job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE) - pieceJob := &service.PieceJob{ + job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE) + pieceJob := &stypesv1pb.PieceJob{ ObjectId: object.GetObjectId(), PayloadSize: object.GetSize(), RedundancyType: object.GetRedundancyType(), } - pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{ + pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: "test-storage-provider", PieceChecksum: [][]byte{hash.GenerateChecksum([]byte(time.Now().String()))}, IntegrityHash: hash.GenerateChecksum([]byte(time.Now().String())), @@ -83,8 +83,8 @@ func TestDoneReplicatePieceJob(t *testing.T) { } func TestDoneInlinePieceJob(t *testing.T) { - job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE) - pieceJob := &service.PieceJob{ + job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE) + pieceJob := &stypesv1pb.PieceJob{ ObjectId: object.GetObjectId(), PayloadSize: object.GetSize(), RedundancyType: object.GetRedundancyType(), @@ -92,7 +92,7 @@ func TestDoneInlinePieceJob(t *testing.T) { intergrity := hash.GenerateChecksum([]byte(time.Now().String())) pieceCheckSum := hash.GenerateChecksum([]byte(time.Now().String())) signature := hash.GenerateChecksum([]byte(time.Now().String())) - pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{ + pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: "test-storage-provider", PieceChecksum: [][]byte{pieceCheckSum}, IntegrityHash: intergrity, @@ -110,13 +110,13 @@ func TestDoneInlinePieceJob(t *testing.T) { } func TestDoneECPieceJob(t *testing.T) { - job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) - pieceJob := &service.PieceJob{ + job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) + pieceJob := &stypesv1pb.PieceJob{ ObjectId: object.GetObjectId(), PayloadSize: object.GetSize(), RedundancyType: object.GetRedundancyType(), } - pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{ + pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: "test-storage-provider", PieceChecksum: [][]byte{hash.GenerateChecksum([]byte(time.Now().String())), hash.GenerateChecksum([]byte(time.Now().String())), @@ -146,14 +146,14 @@ func TestDoneECPieceJob(t *testing.T) { } func TestSegmentPieceError(t *testing.T) { - job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) - pieceJob := &service.PieceJob{ + job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) + pieceJob := &stypesv1pb.PieceJob{ ObjectId: object.GetObjectId(), PayloadSize: object.GetSize(), RedundancyType: object.GetRedundancyType(), } badCheckSum := hash.GenerateChecksum([]byte(time.Now().String()))[0:10] - pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{ + pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: "test-storage-provider", } pieceJob.StorageProviderSealInfo.PieceIdx = 0 @@ -166,15 +166,15 @@ func TestSegmentPieceError(t *testing.T) { } func TestECPieceError(t *testing.T) { - job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) - pieceJob := &service.PieceJob{ + job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) + pieceJob := &stypesv1pb.PieceJob{ ObjectId: object.GetObjectId(), PayloadSize: object.GetSize(), RedundancyType: object.GetRedundancyType(), } badCheckSum := hash.GenerateChecksum([]byte(time.Now().String()))[0:10] checkSum := hash.GenerateChecksum([]byte(time.Now().String())) - pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{ + pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: "test-storage-provider", } pieceJob.StorageProviderSealInfo.PieceIdx = 0 diff --git a/pkg/job/upload_payload_job.go b/pkg/job/upload_payload_job.go index 18f37c9f2..79b7460c6 100644 --- a/pkg/job/upload_payload_job.go +++ b/pkg/job/upload_payload_job.go @@ -2,8 +2,8 @@ package job import ( merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" ) // UploadPayloadJob maintains the object info and piece job meta @@ -22,15 +22,15 @@ func NewUploadPayloadJob(objectCtx *ObjectInfoContext) (job *UploadPayloadJob, e return nil, err } switch objectCtx.GetObjectRedundancyType() { - case types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: if job.secondaryJob, err = NewECUploadSpJob(objectCtx, true); err != nil { return nil, err } - case types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: if job.secondaryJob, err = NewSegmentUploadSpJob(objectCtx, true); err != nil { return nil, err } - case types.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE: if job.secondaryJob, err = NewSegmentUploadSpJob(objectCtx, true); err != nil { return nil, err } @@ -61,23 +61,23 @@ func (job *UploadPayloadJob) SecondarySPCompleted() bool { } // DonePrimarySPJob complete one primary piece job and update DB. -func (job *UploadPayloadJob) DonePrimarySPJob(pieceJob *service.PieceJob) error { +func (job *UploadPayloadJob) DonePrimarySPJob(pieceJob *stypesv1pb.PieceJob) error { return job.primaryJob.Done(pieceJob) } // DoneSecondarySPJob complete one secondary piece job and update DB. -func (job *UploadPayloadJob) DoneSecondarySPJob(pieceJob *service.PieceJob) error { +func (job *UploadPayloadJob) DoneSecondarySPJob(pieceJob *stypesv1pb.PieceJob) error { return job.secondaryJob.Done(pieceJob) } // PopPendingPrimarySPJob return the uncompleted primary piece job. -func (job *UploadPayloadJob) PopPendingPrimarySPJob() *service.PieceJob { +func (job *UploadPayloadJob) PopPendingPrimarySPJob() *stypesv1pb.PieceJob { pieces := job.primaryJob.PopPendingJob() if len(pieces) == 0 { return nil } obj := job.objectCtx.GetObjectInfo() - pieceJob := &service.PieceJob{ + pieceJob := &stypesv1pb.PieceJob{ ObjectId: obj.ObjectId, PayloadSize: obj.Size, TargetIdx: pieces, @@ -87,13 +87,13 @@ func (job *UploadPayloadJob) PopPendingPrimarySPJob() *service.PieceJob { } // PopPendingSecondarySPJob return the uncompleted secondary piece job. -func (job *UploadPayloadJob) PopPendingSecondarySPJob() *service.PieceJob { +func (job *UploadPayloadJob) PopPendingSecondarySPJob() *stypesv1pb.PieceJob { pieces := job.secondaryJob.PopPendingJob() if len(pieces) == 0 { return nil } obj := job.objectCtx.GetObjectInfo() - pieceJob := &service.PieceJob{ + pieceJob := &stypesv1pb.PieceJob{ ObjectId: obj.ObjectId, PayloadSize: obj.Size, TargetIdx: pieces, @@ -103,11 +103,11 @@ func (job *UploadPayloadJob) PopPendingSecondarySPJob() *service.PieceJob { } // PrimarySPSealInfo return the primary storage provider seal info. -func (job *UploadPayloadJob) PrimarySPSealInfo() ([]*types.StorageProviderInfo, error) { +func (job *UploadPayloadJob) PrimarySPSealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) { return job.primaryJob.SealInfo() } // SecondarySPSealInfo return the secondary storage provider seal info. -func (job *UploadPayloadJob) SecondarySPSealInfo() ([]*types.StorageProviderInfo, error) { +func (job *UploadPayloadJob) SecondarySPSealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) { return job.secondaryJob.SealInfo() } diff --git a/pkg/job/upload_piece_job.go b/pkg/job/upload_piece_job.go index 9ec71f394..af4efc9a5 100644 --- a/pkg/job/upload_piece_job.go +++ b/pkg/job/upload_piece_job.go @@ -5,8 +5,8 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" "github.com/bnb-chain/greenfield-storage-provider/util" "github.com/bnb-chain/greenfield-storage-provider/util/hash" @@ -18,7 +18,7 @@ type UploadSpJob struct { objectCtx *ObjectInfoContext pieceJobs []*jobdb.PieceJob pieceType model.PieceType - redundancy types.RedundancyType + redundancy ptypesv1pb.RedundancyType secondary bool complete int mu sync.RWMutex @@ -32,7 +32,7 @@ func NewSegmentUploadSpJob(objectCtx *ObjectInfoContext, secondary bool) (*Uploa redundancy: objectCtx.GetObjectRedundancyType(), } pieceCount := util.ComputeSegmentCount(objectCtx.GetObjectSize()) - if job.secondary && job.redundancy == types.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE { + if job.secondary && job.redundancy == ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE { pieceCount = model.EC_M + model.EC_K } for idx := 0; idx < int(pieceCount); idx++ { @@ -110,7 +110,7 @@ func (job *UploadSpJob) PopPendingJob() (pieceIdx []uint32) { } // Done completed one piece job and store the state to DB. -func (job *UploadSpJob) Done(pieceJob *service.PieceJob) error { +func (job *UploadSpJob) Done(pieceJob *stypesv1pb.PieceJob) error { job.mu.Lock() defer job.mu.Unlock() // 1. check job weather has completed @@ -142,7 +142,7 @@ func (job *UploadSpJob) Done(pieceJob *service.PieceJob) error { } // donePrimary update primary piece job state, include memory and db. -func (job *UploadSpJob) doneSegment(segmentPiece *jobdb.PieceJob, pieceJob *service.PieceJob) error { +func (job *UploadSpJob) doneSegment(segmentPiece *jobdb.PieceJob, pieceJob *stypesv1pb.PieceJob) error { pieceCount := uint32(len(pieceJob.GetStorageProviderSealInfo().GetPieceChecksum())) segmentCount := util.ComputeSegmentCount(job.objectCtx.GetObjectSize()) // primary segment @@ -210,7 +210,7 @@ func (job *UploadSpJob) doneSegment(segmentPiece *jobdb.PieceJob, pieceJob *serv } // doneSecondary update primary piece job state, include memory and db. -func (job *UploadSpJob) doneEC(ecPiece *jobdb.PieceJob, pieceJob *service.PieceJob) error { +func (job *UploadSpJob) doneEC(ecPiece *jobdb.PieceJob, pieceJob *stypesv1pb.PieceJob) error { pieceCount := uint32(len(pieceJob.GetStorageProviderSealInfo().GetPieceChecksum())) segmentCount := util.ComputeSegmentCount(job.objectCtx.GetObjectSize()) if pieceCount != segmentCount { @@ -251,13 +251,13 @@ func (job *UploadSpJob) doneEC(ecPiece *jobdb.PieceJob, pieceJob *service.PieceJ } // SealInfo return the info for seal. -func (job *UploadSpJob) SealInfo() ([]*types.StorageProviderInfo, error) { +func (job *UploadSpJob) SealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) { job.mu.RLock() defer job.mu.RUnlock() if job.complete != len(job.pieceJobs) { return nil, merrors.ErrSpJobNotCompleted } - var sealInfo []*types.StorageProviderInfo + var sealInfo []*ptypesv1pb.StorageProviderInfo if job.secondary { sealInfo = job.sealSecondary() } else { @@ -267,23 +267,23 @@ func (job *UploadSpJob) SealInfo() ([]*types.StorageProviderInfo, error) { } // PrimarySealInfo compute the primary integrity hash. -func (job *UploadSpJob) sealPrimary() *types.StorageProviderInfo { +func (job *UploadSpJob) sealPrimary() *ptypesv1pb.StorageProviderInfo { var checksumList [][]byte for _, pieceJob := range job.pieceJobs { checksumList = append(checksumList, pieceJob.Checksum[0]) } // TODO:: sign the primary integrity hash in stone hub level. - return &types.StorageProviderInfo{ + return &ptypesv1pb.StorageProviderInfo{ SpId: job.pieceJobs[0].StorageProvider, Checksum: hash.GenerateIntegrityHash(checksumList), } } // SecondarySealInfo return secondary info for seal, the stone node service report. -func (job *UploadSpJob) sealSecondary() []*types.StorageProviderInfo { - var sealInfo []*types.StorageProviderInfo +func (job *UploadSpJob) sealSecondary() []*ptypesv1pb.StorageProviderInfo { + var sealInfo []*ptypesv1pb.StorageProviderInfo for _, pieceJob := range job.pieceJobs { - sealInfo = append(sealInfo, &types.StorageProviderInfo{ + sealInfo = append(sealInfo, &ptypesv1pb.StorageProviderInfo{ SpId: pieceJob.StorageProvider, Idx: pieceJob.PieceId, Checksum: pieceJob.IntegrityHash, diff --git a/pkg/stone/stone_job_context.go b/pkg/stone/stone_job_context.go index a2f37b859..edbddd0fc 100644 --- a/pkg/stone/stone_job_context.go +++ b/pkg/stone/stone_job_context.go @@ -5,14 +5,14 @@ import ( "sync" "time" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" ) // JobContextWrapper maintain job context, goroutine safe type JobContextWrapper struct { - jobCtx *types.JobContext + jobCtx *ptypesv1pb.JobContext jobErr error jobDB jobdb.JobDB metaDB metadb.MetaDB @@ -20,7 +20,7 @@ type JobContextWrapper struct { } // NewJobContextWrapper return the instance of JobContextWrapper -func NewJobContextWrapper(jobCtx *types.JobContext, jobDB jobdb.JobDB, metaDB metadb.MetaDB) *JobContextWrapper { +func NewJobContextWrapper(jobCtx *ptypesv1pb.JobContext, jobDB jobdb.JobDB, metaDB metadb.MetaDB) *JobContextWrapper { return &JobContextWrapper{ jobCtx: jobCtx, jobDB: jobDB, @@ -32,7 +32,7 @@ func NewJobContextWrapper(jobCtx *types.JobContext, jobDB jobdb.JobDB, metaDB me func (wrapper *JobContextWrapper) GetJobState() (string, error) { wrapper.mu.RLock() defer wrapper.mu.RUnlock() - state, ok := types.JobState_name[int32(wrapper.jobCtx.GetJobState())] + state, ok := ptypesv1pb.JobState_name[int32(wrapper.jobCtx.GetJobState())] if !ok { return "", errors.New("job state error") } @@ -43,7 +43,7 @@ func (wrapper *JobContextWrapper) GetJobState() (string, error) { func (wrapper *JobContextWrapper) SetJobState(state string) error { wrapper.mu.Lock() defer wrapper.mu.Unlock() - wrapper.jobCtx.JobState = types.JobState(types.JobState_value[state]) + wrapper.jobCtx.JobState = ptypesv1pb.JobState(ptypesv1pb.JobState_value[state]) return wrapper.jobDB.SetUploadPayloadJobState(wrapper.jobCtx.JobId, state, time.Now().Unix()) } @@ -64,9 +64,9 @@ func (wrapper *JobContextWrapper) SetJobErr(err error) error { } else { wrapper.jobCtx.JobErr = wrapper.jobCtx.JobErr + err.Error() } - wrapper.jobCtx.JobState = types.JobState_JOB_STATE_ERROR + wrapper.jobCtx.JobState = ptypesv1pb.JobState_JOB_STATE_ERROR return wrapper.jobDB.SetUploadPayloadJobJobError(wrapper.jobCtx.JobId, - types.JOB_STATE_ERROR, wrapper.jobCtx.JobErr, time.Now().Unix()) + ptypesv1pb.JOB_STATE_ERROR, wrapper.jobCtx.JobErr, time.Now().Unix()) } // ModifyTime return the last modify timestamp @@ -77,7 +77,7 @@ func (wrapper *JobContextWrapper) ModifyTime() int64 { } // JobContext return the copy of job context -func (wrapper *JobContextWrapper) JobContext() *types.JobContext { +func (wrapper *JobContextWrapper) JobContext() *ptypesv1pb.JobContext { wrapper.mu.RLock() defer wrapper.mu.RUnlock() return wrapper.jobCtx.SafeCopy() diff --git a/pkg/stone/upload_payload_callback.go b/pkg/stone/upload_payload_callback.go index ebdb8507d..aee6ff7cb 100644 --- a/pkg/stone/upload_payload_callback.go +++ b/pkg/stone/upload_payload_callback.go @@ -6,8 +6,8 @@ import ( "github.com/looplab/fsm" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) @@ -49,7 +49,7 @@ func AfterUploadPrimaryPieceDone(ctx context.Context, event *fsm.Event) { err = merrors.ErrPieceJobMissing return } - pieceInfo, ok := event.Args[0].(*service.PieceJob) + pieceInfo, ok := event.Args[0].(*stypesv1pb.PieceJob) if !ok { err = merrors.ErrPieceJobMissing return @@ -62,7 +62,7 @@ func AfterUploadPrimaryPieceDone(ctx context.Context, event *fsm.Event) { // and update the job state to the DB func EnterUploadPrimaryDone(ctx context.Context, event *fsm.Event) { stone := ctx.Value(CtxStoneKey).(*UploadPayloadStone) - if err := stone.jobCtx.SetJobState(types.JOB_STATE_UPLOAD_PRIMARY_DONE); err != nil { + if err := stone.jobCtx.SetJobState(ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DONE); err != nil { stone.jobCtx.SetJobErr(err) log.CtxErrorw(ctx, "update primary done job state error", "error", err) return @@ -101,7 +101,7 @@ func AfterUploadSecondaryPieceDone(ctx context.Context, event *fsm.Event) { interruptErr = merrors.ErrPieceJobMissing return } - pieceInfo, ok := event.Args[0].(*service.PieceJob) + pieceInfo, ok := event.Args[0].(*stypesv1pb.PieceJob) if !ok { interruptErr = merrors.ErrPieceJobMissing return @@ -116,7 +116,7 @@ func AfterUploadSecondaryPieceDone(ctx context.Context, event *fsm.Event) { // and update the job state to the DB func EnterUploadSecondaryDone(ctx context.Context, event *fsm.Event) { stone := ctx.Value(CtxStoneKey).(*UploadPayloadStone) - if err := stone.jobCtx.SetJobState(types.JOB_STATE_UPLOAD_SECONDARY_DONE); err != nil { + if err := stone.jobCtx.SetJobState(ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DONE); err != nil { stone.jobCtx.SetJobErr(err) log.CtxErrorw(ctx, "update primary done job state error", "error", err) return @@ -126,9 +126,9 @@ func EnterUploadSecondaryDone(ctx context.Context, event *fsm.Event) { // SealObjectJob defines the job to transfer StoneHub type SealObjectJob struct { - ObjectInfo *types.ObjectInfo - PrimarySealInfo []*types.StorageProviderInfo - SecondarySealInfo []*types.StorageProviderInfo + ObjectInfo *ptypesv1pb.ObjectInfo + PrimarySealInfo []*ptypesv1pb.StorageProviderInfo + SecondarySealInfo []*ptypesv1pb.StorageProviderInfo } // EnterSealObjectInit is called when enter JOB_STATE_SEAL_OBJECT_INIT, @@ -136,8 +136,8 @@ type SealObjectJob struct { func EnterSealObjectInit(ctx context.Context, event *fsm.Event) { stone := ctx.Value(CtxStoneKey).(*UploadPayloadStone) var ( - primarySealInfo []*types.StorageProviderInfo - secondarySealInfo []*types.StorageProviderInfo + primarySealInfo []*ptypesv1pb.StorageProviderInfo + secondarySealInfo []*ptypesv1pb.StorageProviderInfo err error ) defer func() { @@ -172,7 +172,7 @@ func EnterSealObjectDoing(ctx context.Context, event *fsm.Event) { // and update the job state to the DB func EnterSealObjectDone(ctx context.Context, event *fsm.Event) { stone := ctx.Value(CtxStoneKey).(*UploadPayloadStone) - if err := stone.jobCtx.SetJobState(types.JOB_STATE_SEAL_OBJECT_DONE); err != nil { + if err := stone.jobCtx.SetJobState(ptypesv1pb.JOB_STATE_SEAL_OBJECT_DONE); err != nil { stone.jobCtx.SetJobErr(err) log.CtxErrorw(ctx, "update seal object done job state error", "error", err) return diff --git a/pkg/stone/upload_payload_stone.go b/pkg/stone/upload_payload_stone.go index 0c647fd97..04be22a6f 100644 --- a/pkg/stone/upload_payload_stone.go +++ b/pkg/stone/upload_payload_stone.go @@ -7,8 +7,8 @@ import ( "github.com/looplab/fsm" "github.com/bnb-chain/greenfield-storage-provider/pkg/job" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" "github.com/bnb-chain/greenfield-storage-provider/util" @@ -38,7 +38,7 @@ type UploadPayloadStone struct { // NewUploadPayloadStone return the instance of UploadPayloadStone func NewUploadPayloadStone(ctx context.Context, - jobContext *types.JobContext, object *types.ObjectInfo, + jobContext *ptypesv1pb.JobContext, object *ptypesv1pb.ObjectInfo, jobDB jobdb.JobDB, metaDB metadb.MetaDB, jobCh chan StoneJob, gcCh chan uint64) (*UploadPayloadStone, error) { jobCtx := NewJobContextWrapper(jobContext, jobDB, metaDB) @@ -75,15 +75,15 @@ func repairState(jobCtx *JobContextWrapper, job *job.UploadPayloadJob) (string, if err != nil { return state, err } - if state == types.JOB_STATE_SEAL_OBJECT_DONE { + if state == ptypesv1pb.JOB_STATE_SEAL_OBJECT_DONE { return state, errors.New("upload payload job has been successfully completed") } - state = types.JOB_STATE_CREATE_OBJECT_DONE + state = ptypesv1pb.JOB_STATE_CREATE_OBJECT_DONE if job.PrimarySPCompleted() { - state = types.JOB_STATE_UPLOAD_PRIMARY_DONE + state = ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DONE } if job.SecondarySPCompleted() { - state = types.JOB_STATE_UPLOAD_SECONDARY_DONE + state = ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DONE } if err := jobCtx.SetJobErr(nil); err != nil { return state, err @@ -111,29 +111,29 @@ func (stone *UploadPayloadStone) selfActionEvent(ctx context.Context, args ...in for { current = stone.jobFsm.Current() switch current { - case types.JOB_STATE_CREATE_OBJECT_DONE: + case ptypesv1pb.JOB_STATE_CREATE_OBJECT_DONE: event = UploadPayloadInitEvent - case types.JOB_STATE_UPLOAD_PRIMARY_INIT: + case ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_INIT: event = UploadPrimaryDoingEvent - case types.JOB_STATE_UPLOAD_PRIMARY_DOING: + case ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DOING: if stone.job.PrimarySPCompleted() { event = UploadPrimaryDoneEvent } else { return nil } - case types.JOB_STATE_UPLOAD_PRIMARY_DONE: + case ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DONE: event = UploadSecondaryInitEvent - case types.JOB_STATE_UPLOAD_SECONDARY_INIT: + case ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_INIT: event = UploadSecondaryDoingEvent - case types.JOB_STATE_UPLOAD_SECONDARY_DOING: + case ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DOING: if stone.job.SecondarySPCompleted() { event = UploadSecondaryDoneEvent } else { return nil } - case types.JOB_STATE_UPLOAD_SECONDARY_DONE: + case ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DONE: event = SealObjectInitEvent - case types.JOB_STATE_SEAL_OBJECT_INIT: + case ptypesv1pb.JOB_STATE_SEAL_OBJECT_INIT: event = SealObjectDoingEvent default: return nil @@ -148,7 +148,7 @@ func (stone *UploadPayloadStone) selfActionEvent(ctx context.Context, args ...in // ActionEvent receive the event and propelled fsm execution func (stone *UploadPayloadStone) ActionEvent(ctx context.Context, event string, args ...interface{}) error { - if stone.jobCtx.JobErr() != nil || stone.jobFsm.Current() == types.JOB_STATE_ERROR { + if stone.jobCtx.JobErr() != nil || stone.jobFsm.Current() == ptypesv1pb.JOB_STATE_ERROR { // log error return stone.jobCtx.JobErr() } @@ -169,7 +169,7 @@ func (stone *UploadPayloadStone) ActionEvent(ctx context.Context, event string, // InterruptStone interrupt the fsm and stop the stone func (stone *UploadPayloadStone) InterruptStone(ctx context.Context, err error) error { - if stone.jobCtx.JobErr() != nil || stone.jobFsm.Current() == types.JOB_STATE_ERROR { + if stone.jobCtx.JobErr() != nil || stone.jobFsm.Current() == ptypesv1pb.JOB_STATE_ERROR { log.CtxWarnw(ctx, "interrupt stone fsm params error") return stone.jobCtx.JobErr() } @@ -185,12 +185,12 @@ func (stone *UploadPayloadStone) PrimarySPJobDone() bool { } // PopPendingPrimarySPJob return the uncompleted upload primary storage provider job -func (stone *UploadPayloadStone) PopPendingPrimarySPJob() *service.PieceJob { +func (stone *UploadPayloadStone) PopPendingPrimarySPJob() *stypesv1pb.PieceJob { return stone.job.PopPendingPrimarySPJob() } // PopPendingSecondarySPJob return the uncompleted upload secondary storage provider job -func (stone *UploadPayloadStone) PopPendingSecondarySPJob() *service.PieceJob { +func (stone *UploadPayloadStone) PopPendingSecondarySPJob() *stypesv1pb.PieceJob { return stone.job.PopPendingSecondarySPJob() } @@ -210,11 +210,11 @@ func (stone *UploadPayloadStone) GetStoneState() (string, error) { } // GetJobContext return the job context -func (stone *UploadPayloadStone) GetJobContext() *types.JobContext { +func (stone *UploadPayloadStone) GetJobContext() *ptypesv1pb.JobContext { return stone.jobCtx.JobContext() } // GetObjectInfo return the object info -func (stone *UploadPayloadStone) GetObjectInfo() *types.ObjectInfo { +func (stone *UploadPayloadStone) GetObjectInfo() *ptypesv1pb.ObjectInfo { return stone.objCtx.GetObjectInfo() } diff --git a/pkg/stone/upload_payload_stone_test.go b/pkg/stone/upload_payload_stone_test.go index 47c15e9eb..9645c5827 100644 --- a/pkg/stone/upload_payload_stone_test.go +++ b/pkg/stone/upload_payload_stone_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/assert" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb/jobmemory" "github.com/bnb-chain/greenfield-storage-provider/util/hash" ) @@ -24,7 +24,7 @@ var ( ) func InitENV() (*UploadPayloadStone, error) { - object := &types.ObjectInfo{ + object := &ptypesv1pb.ObjectInfo{ Owner: "test_owner", BucketName: "test_bucket", ObjectName: "test_object", @@ -32,8 +32,8 @@ func InitENV() (*UploadPayloadStone, error) { TxHash: txHash, Height: height, ObjectId: objectID, - RedundancyType: types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - PrimarySp: &types.StorageProviderInfo{ + RedundancyType: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + PrimarySp: &ptypesv1pb.StorageProviderInfo{ SpId: "bnb-test-sp", }, } @@ -59,7 +59,7 @@ func InitENV() (*UploadPayloadStone, error) { func TestFsmPrimaryDoing(t *testing.T) { stone, err := InitENV() assert.Equal(t, nil, err) - assert.Equal(t, types.JOB_STATE_UPLOAD_PRIMARY_DOING, stone.jobFsm.Current()) + assert.Equal(t, ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DOING, stone.jobFsm.Current()) primaryPieceJob := stone.job.PopPendingPrimarySPJob() assert.Equal(t, 4, len(primaryPieceJob.TargetIdx)) secondaryPieceJob := stone.job.PopPendingSecondarySPJob() @@ -69,8 +69,8 @@ func TestFsmPrimaryDoing(t *testing.T) { func TestFsmPrimaryDoingError(t *testing.T) { stone, err := InitENV() assert.Equal(t, nil, err) - pieceJob := &service.PieceJob{ - StorageProviderSealInfo: &service.StorageProviderSealInfo{ + pieceJob := &stypesv1pb.PieceJob{ + StorageProviderSealInfo: &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: "bnb-test-sp", PieceIdx: 0, }, @@ -96,8 +96,8 @@ func TestFsmPrimaryDoingAndSecondaryDoingError(t *testing.T) { stone, err := InitENV() assert.Equal(t, nil, err) checkSum := hash.GenerateChecksum([]byte(time.Now().String())) - primaryPieceJob := &service.PieceJob{ - StorageProviderSealInfo: &service.StorageProviderSealInfo{ + primaryPieceJob := &stypesv1pb.PieceJob{ + StorageProviderSealInfo: &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: "bnb-test-sp", }, } @@ -127,8 +127,8 @@ func TestFsmPrimaryDoingAndSecondaryDoingError(t *testing.T) { pendingPrimaryPieceJob = stone.job.PopPendingPrimarySPJob() assert.Equal(t, true, stone.job.PrimarySPCompleted()) - secondaryPieceJob := &service.PieceJob{ - StorageProviderSealInfo: &service.StorageProviderSealInfo{ + secondaryPieceJob := &stypesv1pb.PieceJob{ + StorageProviderSealInfo: &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: "bnb-test-sp", }, } diff --git a/pkg/stone/upload_stone_event.go b/pkg/stone/upload_stone_event.go index a4a40cd03..63536a76f 100644 --- a/pkg/stone/upload_stone_event.go +++ b/pkg/stone/upload_stone_event.go @@ -3,7 +3,7 @@ package stone import ( "github.com/looplab/fsm" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" ) /* @@ -38,51 +38,51 @@ var ( // UploadPayloadFsmEvent define FSM event transitions var UploadPayloadFsmEvent = fsm.Events{ - {Name: UploadPayloadInitEvent, Src: []string{types.JOB_STATE_CREATE_OBJECT_DONE}, Dst: types.JOB_STATE_UPLOAD_PRIMARY_INIT}, - {Name: UploadPrimaryDoingEvent, Src: []string{types.JOB_STATE_UPLOAD_PRIMARY_INIT}, Dst: types.JOB_STATE_UPLOAD_PRIMARY_DOING}, - {Name: UploadPrimaryPieceDoneEvent, Src: []string{types.JOB_STATE_UPLOAD_PRIMARY_DOING}, Dst: types.JOB_STATE_UPLOAD_PRIMARY_DOING}, - {Name: UploadPrimaryDoneEvent, Src: []string{types.JOB_STATE_UPLOAD_PRIMARY_DOING}, Dst: types.JOB_STATE_UPLOAD_PRIMARY_DONE}, - {Name: UploadSecondaryInitEvent, Src: []string{types.JOB_STATE_UPLOAD_PRIMARY_DONE}, Dst: types.JOB_STATE_UPLOAD_SECONDARY_INIT}, - {Name: UploadSecondaryDoingEvent, Src: []string{types.JOB_STATE_UPLOAD_SECONDARY_INIT}, Dst: types.JOB_STATE_UPLOAD_SECONDARY_DOING}, - {Name: UploadSecondaryPieceDoneEvent, Src: []string{types.JOB_STATE_UPLOAD_SECONDARY_DOING}, Dst: types.JOB_STATE_UPLOAD_SECONDARY_DOING}, - {Name: UploadSecondaryDoneEvent, Src: []string{types.JOB_STATE_UPLOAD_SECONDARY_DOING}, Dst: types.JOB_STATE_UPLOAD_SECONDARY_DONE}, - {Name: SealObjectInitEvent, Src: []string{types.JOB_STATE_UPLOAD_SECONDARY_DONE}, Dst: types.JOB_STATE_SEAL_OBJECT_INIT}, - {Name: SealObjectDoingEvent, Src: []string{types.JOB_STATE_SEAL_OBJECT_INIT}, Dst: types.JOB_STATE_SEAL_OBJECT_TX_DOING}, - {Name: SealObjectDoneEvent, Src: []string{types.JOB_STATE_SEAL_OBJECT_TX_DOING}, Dst: types.JOB_STATE_SEAL_OBJECT_DONE}, + {Name: UploadPayloadInitEvent, Src: []string{ptypesv1pb.JOB_STATE_CREATE_OBJECT_DONE}, Dst: ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_INIT}, + {Name: UploadPrimaryDoingEvent, Src: []string{ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_INIT}, Dst: ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DOING}, + {Name: UploadPrimaryPieceDoneEvent, Src: []string{ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DOING}, Dst: ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DOING}, + {Name: UploadPrimaryDoneEvent, Src: []string{ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DOING}, Dst: ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DONE}, + {Name: UploadSecondaryInitEvent, Src: []string{ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DONE}, Dst: ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_INIT}, + {Name: UploadSecondaryDoingEvent, Src: []string{ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_INIT}, Dst: ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DOING}, + {Name: UploadSecondaryPieceDoneEvent, Src: []string{ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DOING}, Dst: ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DOING}, + {Name: UploadSecondaryDoneEvent, Src: []string{ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DOING}, Dst: ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DONE}, + {Name: SealObjectInitEvent, Src: []string{ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DONE}, Dst: ptypesv1pb.JOB_STATE_SEAL_OBJECT_INIT}, + {Name: SealObjectDoingEvent, Src: []string{ptypesv1pb.JOB_STATE_SEAL_OBJECT_INIT}, Dst: ptypesv1pb.JOB_STATE_SEAL_OBJECT_TX_DOING}, + {Name: SealObjectDoneEvent, Src: []string{ptypesv1pb.JOB_STATE_SEAL_OBJECT_TX_DOING}, Dst: ptypesv1pb.JOB_STATE_SEAL_OBJECT_DONE}, {Name: InterruptEvent, Src: []string{ - types.JOB_STATE_CREATE_OBJECT_DONE, - types.JOB_STATE_UPLOAD_PRIMARY_INIT, - types.JOB_STATE_UPLOAD_PRIMARY_DOING, - types.JOB_STATE_UPLOAD_PRIMARY_DONE, - types.JOB_STATE_UPLOAD_SECONDARY_INIT, - types.JOB_STATE_UPLOAD_SECONDARY_DOING, - types.JOB_STATE_UPLOAD_SECONDARY_DONE, - types.JOB_STATE_SEAL_OBJECT_INIT, - types.JOB_STATE_SEAL_OBJECT_TX_DOING, - types.JOB_STATE_SEAL_OBJECT_DONE}, - Dst: types.JOB_STATE_ERROR}, + ptypesv1pb.JOB_STATE_CREATE_OBJECT_DONE, + ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_INIT, + ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DOING, + ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DONE, + ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_INIT, + ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DOING, + ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DONE, + ptypesv1pb.JOB_STATE_SEAL_OBJECT_INIT, + ptypesv1pb.JOB_STATE_SEAL_OBJECT_TX_DOING, + ptypesv1pb.JOB_STATE_SEAL_OBJECT_DONE}, + Dst: ptypesv1pb.JOB_STATE_ERROR}, } // define FSM action, the Action associated with callback var ( - ActionBeforeEvent string = "before_" - ActionLeaveState string = "leave_" - ActionEnterState string = "enter_" - ActionAfterEvent string = "after_" - ActionEnterStateUploadPrimaryInit string = ActionEnterState + types.JOB_STATE_UPLOAD_PRIMARY_INIT - ActionEnterStateUploadPrimaryDoing string = ActionEnterState + types.JOB_STATE_UPLOAD_PRIMARY_DOING - ActionAfterEventUploadPrimaryPieceDone string = ActionAfterEvent + UploadPrimaryPieceDoneEvent - ActionEnterUploadPrimaryDone string = ActionEnterState + types.JOB_STATE_UPLOAD_PRIMARY_DONE - ActionEnterUploadSecondaryInit string = ActionEnterState + types.JOB_STATE_UPLOAD_SECONDARY_INIT - ActionEnterUploadSecondaryDoing string = ActionEnterState + types.JOB_STATE_UPLOAD_SECONDARY_DOING - ActionAfterEventUploadSecondaryPieceDone string = ActionAfterEvent + UploadSecondaryPieceDoneEvent - ActionEnterUploadSecondaryDone string = ActionEnterState + UploadSecondaryDoneEvent - ActionEnterSealObjectInit string = ActionEnterState + types.JOB_STATE_SEAL_OBJECT_INIT - ActionEnterSealObjectDoing string = ActionEnterState + types.JOB_STATE_SEAL_OBJECT_TX_DOING - ActionEnterSealObjectDone string = ActionEnterState + types.JOB_STATE_SEAL_OBJECT_DONE - ActionAfterEventInterrupt string = ActionAfterEvent + InterruptEvent - ActionBeforeEventAll string = ActionBeforeEvent + "event" - ActionAfterEventAll string = ActionAfterEvent + "event" + ActionBeforeEvent = "before_" + ActionLeaveState = "leave_" + ActionEnterState = "enter_" + ActionAfterEvent = "after_" + ActionEnterStateUploadPrimaryInit = ActionEnterState + ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_INIT + ActionEnterStateUploadPrimaryDoing = ActionEnterState + ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DOING + ActionAfterEventUploadPrimaryPieceDone = ActionAfterEvent + UploadPrimaryPieceDoneEvent + ActionEnterUploadPrimaryDone = ActionEnterState + ptypesv1pb.JOB_STATE_UPLOAD_PRIMARY_DONE + ActionEnterUploadSecondaryInit = ActionEnterState + ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_INIT + ActionEnterUploadSecondaryDoing = ActionEnterState + ptypesv1pb.JOB_STATE_UPLOAD_SECONDARY_DOING + ActionAfterEventUploadSecondaryPieceDone = ActionAfterEvent + UploadSecondaryPieceDoneEvent + ActionEnterUploadSecondaryDone = ActionEnterState + UploadSecondaryDoneEvent + ActionEnterSealObjectInit = ActionEnterState + ptypesv1pb.JOB_STATE_SEAL_OBJECT_INIT + ActionEnterSealObjectDoing = ActionEnterState + ptypesv1pb.JOB_STATE_SEAL_OBJECT_TX_DOING + ActionEnterSealObjectDone = ActionEnterState + ptypesv1pb.JOB_STATE_SEAL_OBJECT_DONE + ActionAfterEventInterrupt = ActionAfterEvent + InterruptEvent + ActionBeforeEventAll = ActionBeforeEvent + "event" + ActionAfterEventAll = ActionAfterEvent + "event" ) // UploadPayLoadFsmCallBack map the action that event occurs or state changes trigger and the callback diff --git a/proto/service/types/v1/challenge.proto b/proto/service/types/v1/challenge.proto index c91a5c2ad..34da1e3b8 100644 --- a/proto/service/types/v1/challenge.proto +++ b/proto/service/types/v1/challenge.proto @@ -25,4 +25,4 @@ message ChallengeServiceChallengePieceResponse { service ChallengeService { rpc ChallengePiece(ChallengeServiceChallengePieceRequest) returns (ChallengeServiceChallengePieceResponse) {}; -} \ No newline at end of file +} diff --git a/proto/service/types/v1/downloader.proto b/proto/service/types/v1/downloader.proto index 0a3305180..d18714661 100644 --- a/proto/service/types/v1/downloader.proto +++ b/proto/service/types/v1/downloader.proto @@ -32,4 +32,4 @@ message DownloaderServiceDownloaderSegmentResponse { service DownloaderService { rpc DownloaderObject(DownloaderServiceDownloaderObjectRequest) returns (stream DownloaderServiceDownloaderObjectResponse) {}; rpc DownloaderSegment(DownloaderServiceDownloaderSegmentRequest) returns (DownloaderServiceDownloaderSegmentResponse) {}; -} \ No newline at end of file +} diff --git a/service/challenge/challenge.go b/service/challenge/challenge.go index 9b9165b61..210e04ae2 100644 --- a/service/challenge/challenge.go +++ b/service/challenge/challenge.go @@ -2,7 +2,6 @@ package challenge import ( "context" - "errors" "fmt" "net" @@ -29,7 +28,7 @@ type Challenge struct { func NewChallengeService(config *ChallengeConfig) (challenge *Challenge, err error) { challenge = &Challenge{ config: config, - name: "Challenge", + name: model.ChallengeService, } err = challenge.initClient() return @@ -47,7 +46,7 @@ func (challenge *Challenge) initClient() (err error) { return } default: - return errors.New(fmt.Sprintf("meta db not support type %s", challenge.config.MetaType)) + return fmt.Errorf("meta db not support type %s", challenge.config.MetaType) } challenge.pieceStore, err = client.NewStoreClient(challenge.config.PieceConfig) if err != nil { diff --git a/service/client/piece_store_client.go b/service/client/piece_store_client.go index 04aad9847..016b0ecd2 100644 --- a/service/client/piece_store_client.go +++ b/service/client/piece_store_client.go @@ -37,9 +37,6 @@ func (client *StoreClient) GetPiece(ctx context.Context, key string, offset, lim log.Errorw("stone node service invoke PieceStore Get failed", "error", err) return nil, err } - //b := make([]byte, 0) - //a := bytes.NewReader(b) - //io.Copy(rc, a) data, err := io.ReadAll(rc) if err != nil { log.Errorw("stone node service invoke io.ReadAll failed", "error", err) diff --git a/service/client/stone_hub_client.go b/service/client/stone_hub_client.go index e41c449c1..55fb227c2 100644 --- a/service/client/stone_hub_client.go +++ b/service/client/stone_hub_client.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util" "github.com/bnb-chain/greenfield-storage-provider/util/log" @@ -140,7 +141,8 @@ func (client *StoneHubClient) AllocStoneJob(ctx context.Context, opts ...grpc.Ca return nil, err } if resp.PieceJob == nil { - log.CtxDebugw(ctx, "alloc stone job empty.") + log.CtxDebugw(ctx, "alloc stone job is empty") + return nil, merrors.ErrEmptyJob } if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "alloc stone job failed", "error", resp.GetErrMessage().GetErrMsg()) diff --git a/service/downloader/downloader.go b/service/downloader/downloader.go index 84bdaf1d2..794e34a2c 100644 --- a/service/downloader/downloader.go +++ b/service/downloader/downloader.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc/reflection" "github.com/bnb-chain/greenfield-storage-provider/mock" + "github.com/bnb-chain/greenfield-storage-provider/model" "github.com/bnb-chain/greenfield-storage-provider/service/client" stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" @@ -25,7 +26,7 @@ type Downloader struct { func NewDownloaderService(cfg *DownloaderConfig) (*Downloader, error) { downloader := &Downloader{ cfg: cfg, - name: "Downloader", + name: model.DownloaderService, } pieceStore, err := client.NewStoreClient(cfg.PieceStoreConfig) if err != nil { diff --git a/service/gateway/gateway.go b/service/gateway/gateway.go index c01908627..91db8153d 100644 --- a/service/gateway/gateway.go +++ b/service/gateway/gateway.go @@ -13,10 +13,6 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/util/log" ) -const ( - ServiceNameGateway string = "GatewayService" -) - // Gateway is the primary entry point of SP. type Gateway struct { config *GatewayConfig @@ -39,7 +35,7 @@ func NewGatewayService(cfg *GatewayConfig) (*Gateway, error) { g = &Gateway{ config: cfg, - name: ServiceNameGateway, + name: model.GatewayService, } if g.uploadProcessor, err = newUploadProcessor(g.config.UploaderConfig); err != nil { log.Warnw("failed to create uploader", "err", err) @@ -60,7 +56,7 @@ func NewGatewayService(cfg *GatewayConfig) (*Gateway, error) { // Name implement the lifecycle interface func (g *Gateway) Name() string { - return model.GatewayService + return g.name } // Start implement the lifecycle interface diff --git a/service/stonenode/helper_test.go b/service/stonenode/helper_test.go index 4b6f9466d..fddb1275f 100644 --- a/service/stonenode/helper_test.go +++ b/service/stonenode/helper_test.go @@ -6,6 +6,7 @@ import ( "google.golang.org/grpc" + "github.com/bnb-chain/greenfield-storage-provider/model" ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" ) @@ -19,7 +20,7 @@ func setup(t *testing.T) *StoneNodeService { StorageProvider: "test", StoneJobLimit: 0, }, - name: ServiceNameStoneNode, + name: model.StoneNodeService, stoneLimit: 0, } } diff --git a/service/stonenode/server.go b/service/stonenode/server.go index aad69f126..e48b1e7ed 100644 --- a/service/stonenode/server.go +++ b/service/stonenode/server.go @@ -6,14 +6,14 @@ import ( "sync/atomic" "time" + "github.com/bnb-chain/greenfield-storage-provider/model" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/service/client" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) const ( - ServiceNameStoneNode string = "StoneNode" - AllocStonePeriod = time.Second * 1 + allocStonePeriod = time.Second * 1 ) // StoneNodeService manages stone execution units @@ -33,7 +33,7 @@ type StoneNodeService struct { func NewStoneNodeService(config *StoneNodeConfig) (*StoneNodeService, error) { node := &StoneNodeService{ cfg: config, - name: ServiceNameStoneNode, + name: model.StoneNodeService, stopCh: make(chan struct{}), stoneLimit: config.StoneJobLimit, } @@ -43,7 +43,7 @@ func NewStoneNodeService(config *StoneNodeConfig) (*StoneNodeService, error) { return node, nil } -// InitClient inits store client and rpc client +// initClient inits store client and rpc client func (node *StoneNodeService) initClient() error { if node.running.Load() == true { return merrors.ErrStoneNodeStarted @@ -81,20 +81,20 @@ func (node *StoneNodeService) Start(startCtx context.Context) error { } go func() { var stoneJobCounter int64 // atomic - allocTicker := time.NewTicker(AllocStonePeriod) + allocTicker := time.NewTicker(allocStonePeriod) ctx, cancel := context.WithCancel(startCtx) for { select { case <-allocTicker.C: go func() { if !node.running.Load() { - log.Errorw("stone node service stopped, can not alloc stone.") + log.Errorw("stone node service stopped, can not alloc stone") return } atomic.AddInt64(&stoneJobCounter, 1) defer atomic.AddInt64(&stoneJobCounter, -1) if atomic.LoadInt64(&stoneJobCounter) > node.stoneLimit { - log.Errorw("stone job running number exceeded, skip current alloc stone.") + log.Errorw("stone job running number exceeded, skip current alloc stone") return } // TBD::exceed stoneLimit or alloc empty stone, diff --git a/service/stonenode/sync.go b/service/stonenode/sync.go index 3b75d5274..e5b8df98f 100644 --- a/service/stonenode/sync.go +++ b/service/stonenode/sync.go @@ -25,26 +25,28 @@ func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, allocR // REPLICA_TYPE and INLINE_TYPE need segments count + backup secondarySPs := mock.AllocUploadSecondarySP() - // 1. load all segments data from primary piece store and do ec or not - pieceData, err := node.loadSegmentsData(ctx, allocResp) - if err != nil { - node.reportErrToStoneHub(ctx, allocResp, err) - return err - } - // check redundancyType and targetIdx is valid redundancyType := allocResp.GetPieceJob().GetRedundancyType() if err := checkRedundancyType(redundancyType); err != nil { + log.CtxErrorw(ctx, "invalid redundancy type", "redundancy type", redundancyType) node.reportErrToStoneHub(ctx, allocResp, err) return err } targetIdx := allocResp.GetPieceJob().GetTargetIdx() if len(targetIdx) == 0 { + log.CtxError(ctx, "invalid target idx length") node.reportErrToStoneHub(ctx, allocResp, merrors.ErrEmptyTargetIdx) return merrors.ErrEmptyTargetIdx } - // 2. dispatch the piece data to different secondary + // 1. load all segments data from primary piece store and do ec or not + pieceData, err := node.loadSegmentsData(ctx, allocResp) + if err != nil { + node.reportErrToStoneHub(ctx, allocResp, err) + return err + } + + // 2. dispatch the piece data to different secondary sp secondaryPieceData, err := node.dispatchSecondarySP(pieceData, redundancyType, secondarySPs, targetIdx) if err != nil { log.CtxErrorw(ctx, "dispatch piece data to secondary sp error") @@ -160,21 +162,19 @@ func (node *StoneNodeService) loadSegmentsData(ctx context.Context, allocResp *s return pieces, loadSegmentErr } -// spiltSegmentData spilt segment data into pieces data. +// generatePieceData generates piece data from segment data func (node *StoneNodeService) generatePieceData(redundancyType ptypesv1pb.RedundancyType, segmentData []byte) ( pieceData [][]byte, err error) { switch redundancyType { - case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + pieceData = append(pieceData, segmentData) + default: // ec type pieceData, err = redundancy.EncodeRawSegment(segmentData) if err != nil { - return + return nil, err } - case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - pieceData = append(pieceData, segmentData) - default: - return nil, merrors.ErrRedundancyType } - return + return pieceData, nil } // dispatchSecondarySP dispatch piece data to secondary storage provider. @@ -188,106 +188,80 @@ func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment map[string] // if redundancyType is replica or inline, value is [][]byte type, a two-dimensional array // which only contains one []byte data var err error - pieceDataBySecondary, err = fillECData(pieceDataBySegment, secondarySPs, targetIdx, redundancyType) + switch redundancyType { + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + pieceDataBySecondary, err = dispatchReplicaOrInlineData(pieceDataBySegment, secondarySPs, targetIdx) + default: // ec type + pieceDataBySecondary, err = dispatchECData(pieceDataBySegment, secondarySPs, targetIdx) + } if err != nil { log.Errorw("fill piece data by secondary error", "error", err) return nil, err } - - /* - switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - pieceDataBySecondary, err = fillReplicaOrInlineData(pieceDataBySegment, secondarySPs, targetIdx) - default: // ec type - pieceDataBySecondary, err = fillECData(pieceDataBySegment, secondarySPs, targetIdx, redundancyType) - } - if err != nil { - log.Errorw("fill piece data by secondary error", "error", err) - return nil, err - } - */ return pieceDataBySecondary, nil } -func fillECData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32, - redundancyType ptypesv1pb.RedundancyType) (map[string]map[string][]byte, error) { - ecPieceDataMap := make(map[string]map[string][]byte) - for pieceKey, pieceData := range pieceDataBySegment { - //if len(pieceData) != 6 { - // return map[string]map[string][]byte{}, merrors.ErrInvalidECData - //} - - for idx, data := range pieceData { - if idx >= len(secondarySPs) { - return map[string]map[string][]byte{}, merrors.ErrSecondarySPNumber - } - // initialize data map - sp := secondarySPs[idx] - if len(targetIdx) != 0 { - for _, j := range targetIdx { - if int(j) == idx { - if _, ok := ecPieceDataMap[sp]; !ok { - ecPieceDataMap[sp] = make(map[string][]byte) - } - } - } - } - - if len(targetIdx) != 0 { - for _, index := range targetIdx { - if int(index) == idx { - switch redundancyType { - case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - ecPieceDataMap[sp][pieceKey] = data - default: - key := piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, uint32(idx)) - ecPieceDataMap[sp][key] = data - } - } - } - } - } - } - return ecPieceDataMap, nil -} - -func fillReplicaOrInlineData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) ( +// dispatchReplicaOrInlineData dispatches replica or inline data into different sp, each sp should store all segments data of an object +// if an object uses replica type, it's split into 10 segments and there are 6 sp, each sp should store 10 segments data +// if an object uses inline type, there is only one segment and there are 6 sp, each sp should store 1 segment data +func dispatchReplicaOrInlineData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) ( map[string]map[string][]byte, error) { replicaOrInlineDataMap := make(map[string]map[string][]byte) - if len(targetIdx) > len(secondarySPs) { + spNumber := len(secondarySPs) + if spNumber < 1 && spNumber > 6 { return replicaOrInlineDataMap, merrors.ErrSecondarySPNumber } - // iterate map in order keys := util.GenericSortedKeys(pieceDataBySegment) - for i := 0; i < len(keys); i++ { - pieceKey := keys[i] - pieceData := pieceDataBySegment[pieceKey] - if len(pieceData) != 1 { - return nil, merrors.ErrInvalidSegmentData - } - - // each segment piece data writes to different sp + for i := 0; i < len(secondarySPs); i++ { sp := secondarySPs[i] - if len(targetIdx) != 0 { + for j := 0; j < len(keys); j++ { + pieceKey := keys[j] + pieceData := pieceDataBySegment[pieceKey] + if len(pieceData) != 1 { + return nil, merrors.ErrInvalidSegmentData + } + for _, index := range targetIdx { if int(index) == i { if _, ok := replicaOrInlineDataMap[sp]; !ok { replicaOrInlineDataMap[sp] = make(map[string][]byte) } + replicaOrInlineDataMap[sp][pieceKey] = pieceData[0] } } } + } + return replicaOrInlineDataMap, nil +} + +// dispatchECData dispatched ec data into different sp +// one sp stores same ec column data: sp1 stores all ec1 data, sp2 stores all ec2 data, etc +func dispatchECData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32) (map[string]map[string][]byte, error) { + ecPieceDataMap := make(map[string]map[string][]byte) + for pieceKey, pieceData := range pieceDataBySegment { + if len(pieceData) != 6 { + return map[string]map[string][]byte{}, merrors.ErrInvalidECData + } + + for idx, data := range pieceData { + if idx >= len(secondarySPs) { + return map[string]map[string][]byte{}, merrors.ErrSecondarySPNumber + } - if len(targetIdx) != 0 { + sp := secondarySPs[idx] for _, index := range targetIdx { - if int(index) == i { - replicaOrInlineDataMap[sp][pieceKey] = pieceData[0] + if int(index) == idx { + if _, ok := ecPieceDataMap[sp]; !ok { + ecPieceDataMap[sp] = make(map[string][]byte) + } + key := piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, uint32(idx)) + ecPieceDataMap[sp][key] = data } } } } - return replicaOrInlineDataMap, nil + return ecPieceDataMap, nil } // doSyncToSecondarySP send piece data to the secondary. @@ -297,8 +271,7 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *sty objectID = resp.GetPieceJob().GetObjectId() payloadSize = resp.GetPieceJob().GetPayloadSize() redundancyType = resp.GetPieceJob().GetRedundancyType() - //segmentCount = util.ComputeSegmentCount(payloadSize) - txHash = resp.GetTxHash() + txHash = resp.GetTxHash() ) for secondary, pieceData := range pieceDataBySecondary { go func(secondary string, pieceData map[string][]byte) { @@ -342,7 +315,6 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *sty var pieceHash [][]byte keys := util.GenericSortedKeys(pieceData) - log.Debugw("sorted keys", "keys", keys) for _, key := range keys { pieceHash = append(pieceHash, hash.GenerateChecksum(pieceData[key])) } @@ -384,11 +356,9 @@ func (node *StoneNodeService) reportErrToStoneHub(ctx context.Context, resp *sty log.CtxInfow(ctx, "report stone hub err msg success") } -// SyncPiece send rpc request to secondary storage provider to sync the piece data. +// syncPiece send rpc request to secondary storage provider to sync the piece data. func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *stypesv1pb.SyncerInfo, pieceData map[string][]byte, traceID string) (*stypesv1pb.SyncerServiceSyncPieceResponse, error) { - log.CtxInfow(ctx, "stone node upload piece data", "redundancy_type", syncerInfo.GetRedundancyType(), - "spID", util.SpReadable(syncerInfo.GetStorageProviderId()), "length", len(pieceData)) stream, err := node.syncer.SyncPiece(ctx) if err != nil { log.Errorw("sync secondary piece job error", "err", err) diff --git a/service/stonenode/sync_test.go b/service/stonenode/sync_test.go index bf739dbce..418d69bde 100644 --- a/service/stonenode/sync_test.go +++ b/service/stonenode/sync_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "google.golang.org/grpc" + "github.com/bnb-chain/greenfield-storage-provider/model" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/service/client/mock" @@ -17,7 +18,7 @@ import ( func TestInitClientFailed(t *testing.T) { node := &StoneNodeService{ - name: ServiceNameStoneNode, + name: model.StoneNodeService, stoneLimit: 0, } node.running.Store(true) @@ -124,24 +125,6 @@ func Test_loadSegmentsDataPieceStoreError(t *testing.T) { assert.Equal(t, 0, len(result)) } -func Test_loadSegmentsDataUnknownRedundancyError(t *testing.T) { - node := setup(t) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - ps := mock.NewMockPieceStoreAPI(ctrl) - node.store = ps - ps.EXPECT().GetPiece(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, key string, offset, limit int64) ([]byte, error) { - return []byte("1"), nil - }).AnyTimes() - - result, err := node.loadSegmentsData(context.TODO(), mockAllocResp(20230109006, 20*1024*1024, - ptypesv1pb.RedundancyType(-1))) - assert.Equal(t, merrors.ErrRedundancyType, err) - assert.Equal(t, 0, len(result)) -} - func Test_generatePieceData(t *testing.T) { cases := []struct { name string @@ -171,13 +154,6 @@ func Test_generatePieceData(t *testing.T) { wantedResult: 1, wantedErr: nil, }, - { - name: "unknown redundancy type", - req1: ptypesv1pb.RedundancyType(-1), - req2: []byte("1"), - wantedResult: 0, - wantedErr: merrors.ErrRedundancyType, - }, } node := setup(t) @@ -216,7 +192,7 @@ func Test_dispatchSecondarySP(t *testing.T) { req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, req3: spList, req4: []uint32{0, 1, 2}, - wantedResult: 1, + wantedResult: 3, wantedErr: nil, }, { @@ -243,7 +219,7 @@ func Test_dispatchSecondarySP(t *testing.T) { req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, req3: spList, req4: []uint32{1, 2}, - wantedResult: 0, + wantedResult: 2, wantedErr: nil, }, { @@ -255,24 +231,24 @@ func Test_dispatchSecondarySP(t *testing.T) { wantedResult: 0, wantedErr: merrors.ErrSecondarySPNumber, }, - //{ - // name: "wrong ec segment data length", - // req1: dispatchSegmentMap(), - // req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - // req3: spList, - // req4: []uint32{0, 1, 2, 3, 4, 5}, - // wantedResult: 1, - // wantedErr: merrors.ErrInvalidECData, - //}, - //{ - // name: "wrong replica/inline segment data length", - // req1: dispatchPieceMap(), - // req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, - // req3: spList, - // req4: []uint32{0, 1, 2, 3, 4, 5}, - // wantedResult: 1, - // wantedErr: merrors.ErrInvalidSegmentData, - //}, + { + name: "wrong ec segment data length", + req1: dispatchSegmentMap(), + req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: spList, + req4: []uint32{0, 1, 2, 3, 4, 5}, + wantedResult: 0, + wantedErr: merrors.ErrInvalidECData, + }, + { + name: "wrong replica/inline segment data length", + req1: dispatchPieceMap(), + req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: spList, + req4: []uint32{0, 1, 2, 3, 4, 5}, + wantedResult: 0, + wantedErr: merrors.ErrInvalidSegmentData, + }, } node := setup(t) @@ -356,7 +332,7 @@ func TestSyncPieceSuccess(t *testing.T) { sInfo := &stypesv1pb.SyncerInfo{ ObjectId: 123456, TxHash: []byte("i"), - StorageProviderId: "sp1", + StorageProviderId: "440246a94fc4257096b8d4fa8db94a5655f455f88555f885b10da1466763f742", RedundancyType: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, } data := map[string][]byte{ diff --git a/service/syncer/server.go b/service/syncer/server.go index 34e36750f..f279034b7 100644 --- a/service/syncer/server.go +++ b/service/syncer/server.go @@ -2,7 +2,6 @@ package syncer import ( "context" - "errors" "fmt" "net" "sync/atomic" @@ -11,6 +10,7 @@ import ( "google.golang.org/grpc/reflection" "github.com/bnb-chain/greenfield-storage-provider/model" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/service/client" stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" @@ -18,10 +18,6 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/util/log" ) -const ( - ServiceNameSyncer string = "Syncer" -) - // SyncerService synchronizes ec data to piece store type Syncer struct { cfg *SyncerConfig @@ -35,7 +31,7 @@ type Syncer struct { func NewSyncerService(config *SyncerConfig) (*Syncer, error) { s := &Syncer{ cfg: config, - name: ServiceNameSyncer, + name: model.SyncerService, } if err := s.initClient(); err != nil { return nil, err @@ -82,7 +78,7 @@ func (s *Syncer) Name() string { // Start running SyncerService func (s *Syncer) Start(ctx context.Context) error { if s.running.Swap(true) { - return errors.New("syncer service is running") + return merrors.ErrSyncerStarted } errCh := make(chan error) go s.serve(errCh) @@ -93,7 +89,7 @@ func (s *Syncer) Start(ctx context.Context) error { // Stop running SyncerService func (s *Syncer) Stop(ctx context.Context) error { if !s.running.Swap(false) { - return errors.New("syncer service has already stopped") + return merrors.ErrSyncerStopped } return nil } diff --git a/service/syncer/syncer_service.go b/service/syncer/syncer_service.go index b06c595a7..2b59b85d7 100644 --- a/service/syncer/syncer_service.go +++ b/service/syncer/syncer_service.go @@ -107,8 +107,6 @@ func (s *Syncer) handlePieceData(req *stypesv1pb.SyncerServiceSyncPieceRequest) return nil, "", nil, errors.New("the length of piece data map is not equal to 1") } - var key string - var value []byte redundancyType := req.GetSyncerInfo().GetRedundancyType() integrityMeta := &metadb.IntegrityMeta{ ObjectID: req.GetSyncerInfo().GetObjectId(), @@ -116,9 +114,12 @@ func (s *Syncer) handlePieceData(req *stypesv1pb.SyncerServiceSyncPieceRequest) IsPrimary: false, RedundancyType: redundancyType, } - for k, v := range req.GetPieceData() { - key = k - value = v + + var ( + key string + value []byte + ) + for key, value = range req.GetPieceData() { pieceIndex, err := parsePieceIndex(redundancyType, key) if err != nil { return nil, "", nil, err diff --git a/service/uploader/uploader.go b/service/uploader/uploader.go index 4a1bc8a02..44916272f 100644 --- a/service/uploader/uploader.go +++ b/service/uploader/uploader.go @@ -19,10 +19,6 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/util/log" ) -const ( - ServiceNameUploader string = "UploaderService" -) - // Uploader respond to putObjectTx/putObject impl. type Uploader struct { config *UploaderConfig @@ -45,7 +41,7 @@ func NewUploaderService(cfg *UploaderConfig) (*Uploader, error) { ) u = &Uploader{ config: cfg, - name: ServiceNameUploader, + name: model.UploaderService, } stoneHub, err := client.NewStoneHubClient(cfg.StoneHubServiceAddress) if err != nil { @@ -77,7 +73,7 @@ func (uploader *Uploader) initDB(config *leveldb.MetaLevelDBConfig) (err error) // Name implement the lifecycle interface func (uploader *Uploader) Name() string { - return model.UploaderService + return uploader.name } // Start implement the lifecycle interface diff --git a/test/e2e/services/case_driver.go b/test/e2e/services/case_driver.go index ead73650e..251068a29 100644 --- a/test/e2e/services/case_driver.go +++ b/test/e2e/services/case_driver.go @@ -3,7 +3,7 @@ package main import ( "bytes" "flag" - "io/ioutil" + "io" "math/rand" "net/http" "strings" @@ -50,7 +50,7 @@ func runCase1() { return } defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { log.Errorw("get auth failed, due to read response body", "error", err) return @@ -83,7 +83,7 @@ func runCase1() { return } defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { log.Errorw("put object failed, due to read response body", "error", err) return @@ -137,7 +137,7 @@ func runCase2() { return } defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { log.Errorw("get auth failed, due to read response body", "error", err) return @@ -171,7 +171,7 @@ func runCase2() { return } defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { log.Errorw("put object failed, due to read response body", "error", err) return @@ -226,7 +226,7 @@ func runCase3() { return } defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { log.Errorw("get auth failed, due to read response body", "error", err) return @@ -259,7 +259,7 @@ func runCase3() { return } defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { log.Errorw("put object failed, due to read response body", "error", err) return diff --git a/test/test_tool/main.go b/test/test_tool/main.go index 1f898e113..6c7a38580 100644 --- a/test/test_tool/main.go +++ b/test/test_tool/main.go @@ -64,7 +64,6 @@ func getDir() string { default: return "/" + ctx.CurrentService + "/> " } - return "/>" } var ctx *context.Context diff --git a/test/test_tool/stonehub/query_stone.go b/test/test_tool/stonehub/query_stone.go index 4c4bc50b4..005d26cda 100644 --- a/test/test_tool/stonehub/query_stone.go +++ b/test/test_tool/stonehub/query_stone.go @@ -2,7 +2,6 @@ package stonehub import ( "context" - "encoding/hex" "fmt" "time" @@ -30,14 +29,7 @@ func queryStone(c *cli.Context) { fmt.Println("please cd StoneHubService namespace, try again") return } - txHash, err := hex.DecodeString(c.String("t")) - if err != nil { - fmt.Println("tx hash param decode error: ", err) - return - } - req := &stypesv1pb.StoneHubServiceQueryStoneRequest{ - TxHash: txHash, - } + req := &stypesv1pb.StoneHubServiceQueryStoneRequest{} client, err := GetStoneHubClient() if err != nil { return diff --git a/util/utils.go b/util/utils.go index ba3d2e010..5567d25b2 100644 --- a/util/utils.go +++ b/util/utils.go @@ -48,7 +48,7 @@ func ComputeSegmentCount(size uint64) uint32 { return segmentCount } -// SortedKeys sort keys of a map +// GenericSortedKeys sort keys of a map func GenericSortedKeys[K constraints.Ordered, V any](dataMap map[K]V) []K { keys := make([]K, 0, len(dataMap)) for k := range dataMap {