Skip to content

Commit

Permalink
fix: adjust the dispatching strategy of replica and inline data into …
Browse files Browse the repository at this point in the history
…storage provider (#66)

* fix: dispatch strategy of replica and inline data into storage provider

Co-authored-by: DylanYong <dylan.y@nodereal.io>
  • Loading branch information
sysvm and yzhaoyu authored Feb 2, 2023
1 parent a3602b3 commit 6b6059d
Show file tree
Hide file tree
Showing 30 changed files with 284 additions and 353 deletions.
3 changes: 3 additions & 0 deletions cmd/storage_provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func initService(serviceName string, cfg *config.StorageProviderConfig) (server
if err != nil {
return nil, err
}
default:
log.Errorw("unknown service", "service", serviceName)
return nil, fmt.Errorf("unknow service: %s", serviceName)
}
return server, nil
}
Expand Down
2 changes: 1 addition & 1 deletion config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ StoneJobLimit = 64

[SyncerCfg]
StorageProvider = "bnb-sp"
Address = "127.0.0.1:11600"
Address = "127.0.0.1:9533"
MetaDBType = "leveldb"
[SyncerCfg.PieceConfig.Store]
Storage = "file"
Expand Down
4 changes: 3 additions & 1 deletion model/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
ErrStoneNodeStopped = errors.New("stone node service has stopped")
ErrIntegrityHash = errors.New("secondary integrity hash check error")
ErrRedundancyType = errors.New("unknown redundancy type")
ErrEmptyJob = errors.New("job is empty")
ErrEmptyJob = errors.New("alloc stone job is empty")
ErrSecondarySPNumber = errors.New("secondary sp is not enough")
ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1")
ErrInvalidECData = errors.New("invalid ec data, length is not equal to 6")
Expand All @@ -63,6 +63,8 @@ var (

// syncer service errors
var (
ErrSyncerStarted = errors.New("syncer service is running")
ErrSyncerStopped = errors.New("syncer service has already stopped")
ErrReceivedPieceCount = errors.New("syncer service received piece count is wrong")
)

Expand Down
10 changes: 5 additions & 5 deletions pkg/job/object_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package job
import (
"sync"

types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1"
ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1"
"github.com/bnb-chain/greenfield-storage-provider/store/jobdb"
"github.com/bnb-chain/greenfield-storage-provider/store/metadb"
)

// ObjectInfoContext maintains the object info, goroutine safe.
type ObjectInfoContext struct {
object *types.ObjectInfo
object *ptypesv1pb.ObjectInfo
jobDB jobdb.JobDB
metaDB metadb.MetaDB
mu sync.RWMutex
}

// NewObjectInfoContext return the instance of ObjectInfoContext.
func NewObjectInfoContext(object *types.ObjectInfo, jobDB jobdb.JobDB, metaDB metadb.MetaDB) *ObjectInfoContext {
func NewObjectInfoContext(object *ptypesv1pb.ObjectInfo, jobDB jobdb.JobDB, metaDB metadb.MetaDB) *ObjectInfoContext {
return &ObjectInfoContext{
object: object,
jobDB: jobDB,
Expand All @@ -26,7 +26,7 @@ func NewObjectInfoContext(object *types.ObjectInfo, jobDB jobdb.JobDB, metaDB me
}

// GetObjectInfo return the object info.
func (ctx *ObjectInfoContext) GetObjectInfo() *types.ObjectInfo {
func (ctx *ObjectInfoContext) GetObjectInfo() *ptypesv1pb.ObjectInfo {
ctx.mu.RLock()
defer ctx.mu.RUnlock()
return ctx.object.SafeCopy()
Expand All @@ -47,7 +47,7 @@ func (ctx *ObjectInfoContext) GetObjectSize() uint64 {
}

// GetObjectRedundancyType return the object redundancy type.
func (ctx *ObjectInfoContext) GetObjectRedundancyType() types.RedundancyType {
func (ctx *ObjectInfoContext) GetObjectRedundancyType() ptypesv1pb.RedundancyType {
ctx.mu.RLock()
defer ctx.mu.RUnlock()
return ctx.object.GetRedundancyType()
Expand Down
46 changes: 23 additions & 23 deletions pkg/job/upload_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ import (
"github.com/stretchr/testify/assert"

merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1"
service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1"
ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1"
stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1"
"github.com/bnb-chain/greenfield-storage-provider/store/jobdb/jobmemory"
"github.com/bnb-chain/greenfield-storage-provider/util/hash"
)

func InitEnv(rType types.RedundancyType) (*UploadPayloadJob, *types.ObjectInfo) {
func InitEnv(rType ptypesv1pb.RedundancyType) (*UploadPayloadJob, *ptypesv1pb.ObjectInfo) {
objectSize := 50 * 1024 * 1024
switch rType {
case types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE:
case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE:
objectSize = 1 * 1024 * 1024
}
object := &types.ObjectInfo{
object := &ptypesv1pb.ObjectInfo{
Size: uint64(objectSize),
ObjectId: 1,
RedundancyType: rType,
Expand All @@ -29,27 +29,27 @@ func InitEnv(rType types.RedundancyType) (*UploadPayloadJob, *types.ObjectInfo)
}

func TestInitUploadPayloadJob(t *testing.T) {
job, _ := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)
job, _ := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)
assert.Equal(t, len(job.primaryJob.pieceJobs), 4)
assert.Equal(t, len(job.secondaryJob.pieceJobs), 6)

job, _ = InitEnv(types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE)
job, _ = InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE)
assert.Equal(t, len(job.primaryJob.pieceJobs), 1)
assert.Equal(t, len(job.secondaryJob.pieceJobs), 1)

job, _ = InitEnv(types.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE)
job, _ = InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE)
assert.Equal(t, len(job.primaryJob.pieceJobs), 4)
assert.Equal(t, len(job.secondaryJob.pieceJobs), 4)
}

func TestDoneReplicatePieceJob(t *testing.T) {
job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE)
pieceJob := &service.PieceJob{
job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE)
pieceJob := &stypesv1pb.PieceJob{
ObjectId: object.GetObjectId(),
PayloadSize: object.GetSize(),
RedundancyType: object.GetRedundancyType(),
}
pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{
pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{
StorageProviderId: "test-storage-provider",
PieceChecksum: [][]byte{hash.GenerateChecksum([]byte(time.Now().String()))},
IntegrityHash: hash.GenerateChecksum([]byte(time.Now().String())),
Expand Down Expand Up @@ -83,16 +83,16 @@ func TestDoneReplicatePieceJob(t *testing.T) {
}

func TestDoneInlinePieceJob(t *testing.T) {
job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE)
pieceJob := &service.PieceJob{
job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE)
pieceJob := &stypesv1pb.PieceJob{
ObjectId: object.GetObjectId(),
PayloadSize: object.GetSize(),
RedundancyType: object.GetRedundancyType(),
}
intergrity := hash.GenerateChecksum([]byte(time.Now().String()))
pieceCheckSum := hash.GenerateChecksum([]byte(time.Now().String()))
signature := hash.GenerateChecksum([]byte(time.Now().String()))
pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{
pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{
StorageProviderId: "test-storage-provider",
PieceChecksum: [][]byte{pieceCheckSum},
IntegrityHash: intergrity,
Expand All @@ -110,13 +110,13 @@ func TestDoneInlinePieceJob(t *testing.T) {
}

func TestDoneECPieceJob(t *testing.T) {
job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)
pieceJob := &service.PieceJob{
job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)
pieceJob := &stypesv1pb.PieceJob{
ObjectId: object.GetObjectId(),
PayloadSize: object.GetSize(),
RedundancyType: object.GetRedundancyType(),
}
pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{
pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{
StorageProviderId: "test-storage-provider",
PieceChecksum: [][]byte{hash.GenerateChecksum([]byte(time.Now().String())),
hash.GenerateChecksum([]byte(time.Now().String())),
Expand Down Expand Up @@ -146,14 +146,14 @@ func TestDoneECPieceJob(t *testing.T) {
}

func TestSegmentPieceError(t *testing.T) {
job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)
pieceJob := &service.PieceJob{
job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)
pieceJob := &stypesv1pb.PieceJob{
ObjectId: object.GetObjectId(),
PayloadSize: object.GetSize(),
RedundancyType: object.GetRedundancyType(),
}
badCheckSum := hash.GenerateChecksum([]byte(time.Now().String()))[0:10]
pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{
pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{
StorageProviderId: "test-storage-provider",
}
pieceJob.StorageProviderSealInfo.PieceIdx = 0
Expand All @@ -166,15 +166,15 @@ func TestSegmentPieceError(t *testing.T) {
}

func TestECPieceError(t *testing.T) {
job, object := InitEnv(types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)
pieceJob := &service.PieceJob{
job, object := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)
pieceJob := &stypesv1pb.PieceJob{
ObjectId: object.GetObjectId(),
PayloadSize: object.GetSize(),
RedundancyType: object.GetRedundancyType(),
}
badCheckSum := hash.GenerateChecksum([]byte(time.Now().String()))[0:10]
checkSum := hash.GenerateChecksum([]byte(time.Now().String()))
pieceJob.StorageProviderSealInfo = &service.StorageProviderSealInfo{
pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{
StorageProviderId: "test-storage-provider",
}
pieceJob.StorageProviderSealInfo.PieceIdx = 0
Expand Down
26 changes: 13 additions & 13 deletions pkg/job/upload_payload_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package job

import (
merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1"
service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1"
ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1"
stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1"
)

// UploadPayloadJob maintains the object info and piece job meta
Expand All @@ -22,15 +22,15 @@ func NewUploadPayloadJob(objectCtx *ObjectInfoContext) (job *UploadPayloadJob, e
return nil, err
}
switch objectCtx.GetObjectRedundancyType() {
case types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED:
case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED:
if job.secondaryJob, err = NewECUploadSpJob(objectCtx, true); err != nil {
return nil, err
}
case types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE:
case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE:
if job.secondaryJob, err = NewSegmentUploadSpJob(objectCtx, true); err != nil {
return nil, err
}
case types.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE:
case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE:
if job.secondaryJob, err = NewSegmentUploadSpJob(objectCtx, true); err != nil {
return nil, err
}
Expand Down Expand Up @@ -61,23 +61,23 @@ func (job *UploadPayloadJob) SecondarySPCompleted() bool {
}

// DonePrimarySPJob complete one primary piece job and update DB.
func (job *UploadPayloadJob) DonePrimarySPJob(pieceJob *service.PieceJob) error {
func (job *UploadPayloadJob) DonePrimarySPJob(pieceJob *stypesv1pb.PieceJob) error {
return job.primaryJob.Done(pieceJob)
}

// DoneSecondarySPJob complete one secondary piece job and update DB.
func (job *UploadPayloadJob) DoneSecondarySPJob(pieceJob *service.PieceJob) error {
func (job *UploadPayloadJob) DoneSecondarySPJob(pieceJob *stypesv1pb.PieceJob) error {
return job.secondaryJob.Done(pieceJob)
}

// PopPendingPrimarySPJob return the uncompleted primary piece job.
func (job *UploadPayloadJob) PopPendingPrimarySPJob() *service.PieceJob {
func (job *UploadPayloadJob) PopPendingPrimarySPJob() *stypesv1pb.PieceJob {
pieces := job.primaryJob.PopPendingJob()
if len(pieces) == 0 {
return nil
}
obj := job.objectCtx.GetObjectInfo()
pieceJob := &service.PieceJob{
pieceJob := &stypesv1pb.PieceJob{
ObjectId: obj.ObjectId,
PayloadSize: obj.Size,
TargetIdx: pieces,
Expand All @@ -87,13 +87,13 @@ func (job *UploadPayloadJob) PopPendingPrimarySPJob() *service.PieceJob {
}

// PopPendingSecondarySPJob return the uncompleted secondary piece job.
func (job *UploadPayloadJob) PopPendingSecondarySPJob() *service.PieceJob {
func (job *UploadPayloadJob) PopPendingSecondarySPJob() *stypesv1pb.PieceJob {
pieces := job.secondaryJob.PopPendingJob()
if len(pieces) == 0 {
return nil
}
obj := job.objectCtx.GetObjectInfo()
pieceJob := &service.PieceJob{
pieceJob := &stypesv1pb.PieceJob{
ObjectId: obj.ObjectId,
PayloadSize: obj.Size,
TargetIdx: pieces,
Expand All @@ -103,11 +103,11 @@ func (job *UploadPayloadJob) PopPendingSecondarySPJob() *service.PieceJob {
}

// PrimarySPSealInfo return the primary storage provider seal info.
func (job *UploadPayloadJob) PrimarySPSealInfo() ([]*types.StorageProviderInfo, error) {
func (job *UploadPayloadJob) PrimarySPSealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) {
return job.primaryJob.SealInfo()
}

// SecondarySPSealInfo return the secondary storage provider seal info.
func (job *UploadPayloadJob) SecondarySPSealInfo() ([]*types.StorageProviderInfo, error) {
func (job *UploadPayloadJob) SecondarySPSealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) {
return job.secondaryJob.SealInfo()
}
Loading

0 comments on commit 6b6059d

Please sign in to comment.