Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change index from create object hash to object id #70

Merged
merged 3 commits into from
Feb 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,28 @@ import (
"github.com/bnb-chain/greenfield-storage-provider/service/stonenode"
"github.com/bnb-chain/greenfield-storage-provider/service/syncer"
"github.com/bnb-chain/greenfield-storage-provider/service/uploader"
"github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storage"
"github.com/bnb-chain/greenfield-storage-provider/util"
)

type StorageProviderConfig struct {
Service []string
StoneHubCfg *stonehub.StoneHubConfig
PieceStoreConfig *storage.PieceStoreConfig
GatewayCfg *gateway.GatewayConfig
UploaderCfg *uploader.UploaderConfig
DownloaderCfg *downloader.DownloaderConfig
StoneNodeCfg *stonenode.StoneNodeConfig
SyncerCfg *syncer.SyncerConfig
ChallengeCfg *challenge.ChallengeConfig
Service []string
GatewayCfg *gateway.GatewayConfig
UploaderCfg *uploader.UploaderConfig
DownloaderCfg *downloader.DownloaderConfig
ChallengeCfg *challenge.ChallengeConfig
StoneHubCfg *stonehub.StoneHubConfig
StoneNodeCfg *stonenode.StoneNodeConfig
SyncerCfg *syncer.SyncerConfig
}

var DefaultStorageProviderConfig = &StorageProviderConfig{
StoneHubCfg: stonehub.DefaultStoneHubConfig,
PieceStoreConfig: storage.DefaultPieceStoreConfig,
GatewayCfg: gateway.DefaultGatewayConfig,
UploaderCfg: uploader.DefaultUploaderConfig,
DownloaderCfg: downloader.DefaultDownloaderConfig,
StoneNodeCfg: stonenode.DefaultStoneNodeConfig,
SyncerCfg: syncer.DefaultSyncerConfig,
ChallengeCfg: challenge.DefaultChallengeConfig,
GatewayCfg: gateway.DefaultGatewayConfig,
UploaderCfg: uploader.DefaultUploaderConfig,
DownloaderCfg: downloader.DefaultDownloaderConfig,
ChallengeCfg: challenge.DefaultChallengeConfig,
StoneHubCfg: stonehub.DefaultStoneHubConfig,
StoneNodeCfg: stonenode.DefaultStoneNodeConfig,
SyncerCfg: syncer.DefaultSyncerConfig,
}

// LoadConfig loads the config file
Expand Down
12 changes: 6 additions & 6 deletions pkg/job/object_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
// ObjectInfoContext maintains the object info, goroutine safe.
type ObjectInfoContext struct {
object *ptypesv1pb.ObjectInfo
jobDB jobdb.JobDB
jobDB jobdb.JobDBV2
metaDB metadb.MetaDB
mu sync.RWMutex
}

// NewObjectInfoContext return the instance of ObjectInfoContext.
func NewObjectInfoContext(object *ptypesv1pb.ObjectInfo, jobDB jobdb.JobDB, metaDB metadb.MetaDB) *ObjectInfoContext {
func NewObjectInfoContext(object *ptypesv1pb.ObjectInfo, jobDB jobdb.JobDBV2, metaDB metadb.MetaDB) *ObjectInfoContext {
return &ObjectInfoContext{
object: object,
jobDB: jobDB,
Expand Down Expand Up @@ -64,24 +64,24 @@ func (ctx *ObjectInfoContext) TxHash() []byte {
func (ctx *ObjectInfoContext) getPrimaryPieceJob() ([]*jobdb.PieceJob, error) {
ctx.mu.RLock()
defer ctx.mu.RUnlock()
return ctx.jobDB.GetPrimaryJob(ctx.object.TxHash)
return ctx.jobDB.GetPrimaryJobV2(ctx.object.GetObjectId())
}

// GetSecondaryJob load the secondary piece job from db and return.
func (ctx *ObjectInfoContext) getSecondaryJob() ([]*jobdb.PieceJob, error) {
ctx.mu.RLock()
defer ctx.mu.RUnlock()
return ctx.jobDB.GetSecondaryJob(ctx.object.TxHash)
return ctx.jobDB.GetSecondaryJobV2(ctx.object.GetObjectId())
}

// SetPrimaryPieceJobDone set the primary piece jod completed and update DB.
func (ctx *ObjectInfoContext) SetPrimaryPieceJobDone(job *jobdb.PieceJob) error {
return ctx.jobDB.SetPrimaryPieceJobDone(ctx.object.GetTxHash(), job)
return ctx.jobDB.SetPrimaryPieceJobDoneV2(ctx.object.GetObjectId(), job)
}

// SetSecondaryPieceJobDone set the secondary piece jod completed and update DB.
func (ctx *ObjectInfoContext) SetSecondaryPieceJobDone(job *jobdb.PieceJob) error {
return ctx.jobDB.SetSecondaryPieceJobDone(ctx.object.GetTxHash(), job)
return ctx.jobDB.SetSecondaryPieceJobDoneV2(ctx.object.GetObjectId(), job)
}

// SetSetIntegrityHash set integrity hash info to meta db.
Expand Down
46 changes: 28 additions & 18 deletions pkg/job/upload_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"testing"
"time"

merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1"
"github.com/bnb-chain/greenfield-storage-provider/util/hash"
"github.com/stretchr/testify/assert"

merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1"
stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1"
"github.com/bnb-chain/greenfield-storage-provider/store/jobdb/jobmemory"
"github.com/bnb-chain/greenfield-storage-provider/util/hash"
)

func InitEnv(rType ptypesv1pb.RedundancyType) (*UploadPayloadJob, *ptypesv1pb.ObjectInfo) {
Expand All @@ -24,22 +24,22 @@ func InitEnv(rType ptypesv1pb.RedundancyType) (*UploadPayloadJob, *ptypesv1pb.Ob
ObjectId: 1,
RedundancyType: rType,
}
job, _ := NewUploadPayloadJob(NewObjectInfoContext(object, jobmemory.NewMemJobDB(), nil))
job, _ := NewUploadPayloadJob(NewObjectInfoContext(object, jobmemory.NewMemJobDBV2(), nil))
return job, object
}

func TestInitUploadPayloadJob(t *testing.T) {
job, _ := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)
assert.Equal(t, len(job.primaryJob.pieceJobs), 4)
assert.Equal(t, len(job.secondaryJob.pieceJobs), 6)
assert.Equal(t, len(job.primaryJob.PopPendingJob()), 4)
assert.Equal(t, len(job.secondaryJob.PopPendingJob()), 6)

job, _ = InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE)
assert.Equal(t, len(job.primaryJob.pieceJobs), 1)
assert.Equal(t, len(job.secondaryJob.pieceJobs), 1)
assert.Equal(t, len(job.primaryJob.PopPendingJob()), 1)
assert.Equal(t, len(job.secondaryJob.PopPendingJob()), 6)

job, _ = InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE)
assert.Equal(t, len(job.primaryJob.pieceJobs), 4)
assert.Equal(t, len(job.secondaryJob.pieceJobs), 4)
assert.Equal(t, len(job.primaryJob.PopPendingJob()), 4)
assert.Equal(t, len(job.secondaryJob.PopPendingJob()), 6)
}

func TestDoneReplicatePieceJob(t *testing.T) {
Expand Down Expand Up @@ -68,18 +68,31 @@ func TestDoneReplicatePieceJob(t *testing.T) {
job.DonePrimarySPJob(pieceJob)
assert.Equal(t, true, job.primaryJob.Completed())

pieceJob.StorageProviderSealInfo.PieceChecksum = append(pieceJob.StorageProviderSealInfo.PieceChecksum,
hash.GenerateChecksum([]byte(time.Now().String())))
pieceJob.StorageProviderSealInfo.PieceChecksum = append(pieceJob.StorageProviderSealInfo.PieceChecksum,
hash.GenerateChecksum([]byte(time.Now().String())))
pieceJob.StorageProviderSealInfo.PieceChecksum = append(pieceJob.StorageProviderSealInfo.PieceChecksum,
hash.GenerateChecksum([]byte(time.Now().String())))
assert.Equal(t, 6, len(job.secondaryJob.PopPendingJob()))
pieceJob.StorageProviderSealInfo.PieceIdx = 0
job.DoneSecondarySPJob(pieceJob)
assert.Equal(t, 3, len(job.secondaryJob.PopPendingJob()))
assert.Equal(t, 5, len(job.secondaryJob.PopPendingJob()))
pieceJob.StorageProviderSealInfo.PieceIdx = 3
job.DoneSecondarySPJob(pieceJob)
assert.Equal(t, 2, len(job.secondaryJob.PopPendingJob()))
assert.Equal(t, 4, len(job.secondaryJob.PopPendingJob()))
pieceJob.StorageProviderSealInfo.PieceIdx = 2
job.DoneSecondarySPJob(pieceJob)
assert.Equal(t, 1, len(job.secondaryJob.PopPendingJob()))
assert.Equal(t, 3, len(job.secondaryJob.PopPendingJob()))
pieceJob.StorageProviderSealInfo.PieceIdx = 1
job.DoneSecondarySPJob(pieceJob)
assert.Equal(t, true, job.secondaryJob.Completed())
assert.Equal(t, 2, len(job.secondaryJob.PopPendingJob()))
pieceJob.StorageProviderSealInfo.PieceIdx = 4
job.DoneSecondarySPJob(pieceJob)
assert.Equal(t, 1, len(job.secondaryJob.PopPendingJob()))
pieceJob.StorageProviderSealInfo.PieceIdx = 5
job.DoneSecondarySPJob(pieceJob)
//assert.Equal(t, true, job.secondaryJob.Completed())
}

func TestDoneInlinePieceJob(t *testing.T) {
Expand All @@ -103,10 +116,7 @@ func TestDoneInlinePieceJob(t *testing.T) {
assert.Equal(t, 0, len(job.primaryJob.PopPendingJob()))
pieceJob.StorageProviderSealInfo.PieceIdx = 0
job.DoneSecondarySPJob(pieceJob)
assert.Equal(t, pieceCheckSum, job.secondaryJob.pieceJobs[0].Checksum[0])
assert.Equal(t, intergrity, job.secondaryJob.pieceJobs[0].IntegrityHash)
assert.Equal(t, signature, job.secondaryJob.pieceJobs[0].Signature)
assert.Equal(t, 0, len(job.secondaryJob.PopPendingJob()))
assert.Equal(t, 5, len(job.secondaryJob.PopPendingJob()))
}

func TestDoneECPieceJob(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/job/upload_payload_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,29 @@ import (
// UploadPayloadJob maintains the object info and piece job meta
type UploadPayloadJob struct {
objectCtx *ObjectInfoContext
primaryJob *UploadSpJob // the job of uploading primary storage provider
secondaryJob *UploadSpJob // the job of uploading secondary storage provider
primaryJob PieceJob // the job of uploading primary storage provider
secondaryJob PieceJob // the job of uploading secondary storage provider
}

// NewUploadPayloadJob return the instance of UploadPayloadJob.
func NewUploadPayloadJob(objectCtx *ObjectInfoContext) (job *UploadPayloadJob, err error) {
job = &UploadPayloadJob{
objectCtx: objectCtx,
}
if job.primaryJob, err = NewSegmentUploadSpJob(objectCtx, false); err != nil {
if job.primaryJob, err = NewPrimaryJob(objectCtx); err != nil {
return nil, err
}
switch objectCtx.GetObjectRedundancyType() {
case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED:
if job.secondaryJob, err = NewECUploadSpJob(objectCtx, true); err != nil {
if job.secondaryJob, err = NewSecondaryECPieceJob(objectCtx); err != nil {
return nil, err
}
case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE:
if job.secondaryJob, err = NewSegmentUploadSpJob(objectCtx, true); err != nil {
if job.secondaryJob, err = NewSecondarySegmentPieceJob(objectCtx); err != nil {
return nil, err
}
case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE:
if job.secondaryJob, err = NewSegmentUploadSpJob(objectCtx, true); err != nil {
if job.secondaryJob, err = NewSecondarySegmentPieceJob(objectCtx); err != nil {
return nil, err
}
default:
Expand Down
Loading