diff --git a/config/config.go b/config/config.go index 0b8ffe528..5d815e892 100644 --- a/config/config.go +++ b/config/config.go @@ -13,31 +13,28 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/service/stonenode" "github.com/bnb-chain/greenfield-storage-provider/service/syncer" "github.com/bnb-chain/greenfield-storage-provider/service/uploader" - "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storage" "github.com/bnb-chain/greenfield-storage-provider/util" ) type StorageProviderConfig struct { - Service []string - StoneHubCfg *stonehub.StoneHubConfig - PieceStoreConfig *storage.PieceStoreConfig - GatewayCfg *gateway.GatewayConfig - UploaderCfg *uploader.UploaderConfig - DownloaderCfg *downloader.DownloaderConfig - StoneNodeCfg *stonenode.StoneNodeConfig - SyncerCfg *syncer.SyncerConfig - ChallengeCfg *challenge.ChallengeConfig + Service []string + GatewayCfg *gateway.GatewayConfig + UploaderCfg *uploader.UploaderConfig + DownloaderCfg *downloader.DownloaderConfig + ChallengeCfg *challenge.ChallengeConfig + StoneHubCfg *stonehub.StoneHubConfig + StoneNodeCfg *stonenode.StoneNodeConfig + SyncerCfg *syncer.SyncerConfig } var DefaultStorageProviderConfig = &StorageProviderConfig{ - StoneHubCfg: stonehub.DefaultStoneHubConfig, - PieceStoreConfig: storage.DefaultPieceStoreConfig, - GatewayCfg: gateway.DefaultGatewayConfig, - UploaderCfg: uploader.DefaultUploaderConfig, - DownloaderCfg: downloader.DefaultDownloaderConfig, - StoneNodeCfg: stonenode.DefaultStoneNodeConfig, - SyncerCfg: syncer.DefaultSyncerConfig, - ChallengeCfg: challenge.DefaultChallengeConfig, + GatewayCfg: gateway.DefaultGatewayConfig, + UploaderCfg: uploader.DefaultUploaderConfig, + DownloaderCfg: downloader.DefaultDownloaderConfig, + ChallengeCfg: challenge.DefaultChallengeConfig, + StoneHubCfg: stonehub.DefaultStoneHubConfig, + StoneNodeCfg: stonenode.DefaultStoneNodeConfig, + SyncerCfg: syncer.DefaultSyncerConfig, } // LoadConfig loads the config file diff --git a/pkg/job/object_context.go b/pkg/job/object_context.go index 016f6d1ba..1c54a138a 100644 --- a/pkg/job/object_context.go +++ b/pkg/job/object_context.go @@ -11,13 +11,13 @@ import ( // ObjectInfoContext maintains the object info, goroutine safe. type ObjectInfoContext struct { object *ptypesv1pb.ObjectInfo - jobDB jobdb.JobDB + jobDB jobdb.JobDBV2 metaDB metadb.MetaDB mu sync.RWMutex } // NewObjectInfoContext return the instance of ObjectInfoContext. -func NewObjectInfoContext(object *ptypesv1pb.ObjectInfo, jobDB jobdb.JobDB, metaDB metadb.MetaDB) *ObjectInfoContext { +func NewObjectInfoContext(object *ptypesv1pb.ObjectInfo, jobDB jobdb.JobDBV2, metaDB metadb.MetaDB) *ObjectInfoContext { return &ObjectInfoContext{ object: object, jobDB: jobDB, @@ -64,24 +64,24 @@ func (ctx *ObjectInfoContext) TxHash() []byte { func (ctx *ObjectInfoContext) getPrimaryPieceJob() ([]*jobdb.PieceJob, error) { ctx.mu.RLock() defer ctx.mu.RUnlock() - return ctx.jobDB.GetPrimaryJob(ctx.object.TxHash) + return ctx.jobDB.GetPrimaryJobV2(ctx.object.GetObjectId()) } // GetSecondaryJob load the secondary piece job from db and return. func (ctx *ObjectInfoContext) getSecondaryJob() ([]*jobdb.PieceJob, error) { ctx.mu.RLock() defer ctx.mu.RUnlock() - return ctx.jobDB.GetSecondaryJob(ctx.object.TxHash) + return ctx.jobDB.GetSecondaryJobV2(ctx.object.GetObjectId()) } // SetPrimaryPieceJobDone set the primary piece jod completed and update DB. func (ctx *ObjectInfoContext) SetPrimaryPieceJobDone(job *jobdb.PieceJob) error { - return ctx.jobDB.SetPrimaryPieceJobDone(ctx.object.GetTxHash(), job) + return ctx.jobDB.SetPrimaryPieceJobDoneV2(ctx.object.GetObjectId(), job) } // SetSecondaryPieceJobDone set the secondary piece jod completed and update DB. func (ctx *ObjectInfoContext) SetSecondaryPieceJobDone(job *jobdb.PieceJob) error { - return ctx.jobDB.SetSecondaryPieceJobDone(ctx.object.GetTxHash(), job) + return ctx.jobDB.SetSecondaryPieceJobDoneV2(ctx.object.GetObjectId(), job) } // SetSetIntegrityHash set integrity hash info to meta db. diff --git a/pkg/job/upload_job_test.go b/pkg/job/upload_job_test.go index 61d4d0fb3..3e63bc07e 100644 --- a/pkg/job/upload_job_test.go +++ b/pkg/job/upload_job_test.go @@ -4,13 +4,13 @@ import ( "testing" "time" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/util/hash" "github.com/stretchr/testify/assert" - merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb/jobmemory" - "github.com/bnb-chain/greenfield-storage-provider/util/hash" ) func InitEnv(rType ptypesv1pb.RedundancyType) (*UploadPayloadJob, *ptypesv1pb.ObjectInfo) { @@ -24,22 +24,22 @@ func InitEnv(rType ptypesv1pb.RedundancyType) (*UploadPayloadJob, *ptypesv1pb.Ob ObjectId: 1, RedundancyType: rType, } - job, _ := NewUploadPayloadJob(NewObjectInfoContext(object, jobmemory.NewMemJobDB(), nil)) + job, _ := NewUploadPayloadJob(NewObjectInfoContext(object, jobmemory.NewMemJobDBV2(), nil)) return job, object } func TestInitUploadPayloadJob(t *testing.T) { job, _ := InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) - assert.Equal(t, len(job.primaryJob.pieceJobs), 4) - assert.Equal(t, len(job.secondaryJob.pieceJobs), 6) + assert.Equal(t, len(job.primaryJob.PopPendingJob()), 4) + assert.Equal(t, len(job.secondaryJob.PopPendingJob()), 6) job, _ = InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE) - assert.Equal(t, len(job.primaryJob.pieceJobs), 1) - assert.Equal(t, len(job.secondaryJob.pieceJobs), 1) + assert.Equal(t, len(job.primaryJob.PopPendingJob()), 1) + assert.Equal(t, len(job.secondaryJob.PopPendingJob()), 6) job, _ = InitEnv(ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE) - assert.Equal(t, len(job.primaryJob.pieceJobs), 4) - assert.Equal(t, len(job.secondaryJob.pieceJobs), 4) + assert.Equal(t, len(job.primaryJob.PopPendingJob()), 4) + assert.Equal(t, len(job.secondaryJob.PopPendingJob()), 6) } func TestDoneReplicatePieceJob(t *testing.T) { @@ -68,18 +68,31 @@ func TestDoneReplicatePieceJob(t *testing.T) { job.DonePrimarySPJob(pieceJob) assert.Equal(t, true, job.primaryJob.Completed()) + pieceJob.StorageProviderSealInfo.PieceChecksum = append(pieceJob.StorageProviderSealInfo.PieceChecksum, + hash.GenerateChecksum([]byte(time.Now().String()))) + pieceJob.StorageProviderSealInfo.PieceChecksum = append(pieceJob.StorageProviderSealInfo.PieceChecksum, + hash.GenerateChecksum([]byte(time.Now().String()))) + pieceJob.StorageProviderSealInfo.PieceChecksum = append(pieceJob.StorageProviderSealInfo.PieceChecksum, + hash.GenerateChecksum([]byte(time.Now().String()))) + assert.Equal(t, 6, len(job.secondaryJob.PopPendingJob())) pieceJob.StorageProviderSealInfo.PieceIdx = 0 job.DoneSecondarySPJob(pieceJob) - assert.Equal(t, 3, len(job.secondaryJob.PopPendingJob())) + assert.Equal(t, 5, len(job.secondaryJob.PopPendingJob())) pieceJob.StorageProviderSealInfo.PieceIdx = 3 job.DoneSecondarySPJob(pieceJob) - assert.Equal(t, 2, len(job.secondaryJob.PopPendingJob())) + assert.Equal(t, 4, len(job.secondaryJob.PopPendingJob())) pieceJob.StorageProviderSealInfo.PieceIdx = 2 job.DoneSecondarySPJob(pieceJob) - assert.Equal(t, 1, len(job.secondaryJob.PopPendingJob())) + assert.Equal(t, 3, len(job.secondaryJob.PopPendingJob())) pieceJob.StorageProviderSealInfo.PieceIdx = 1 job.DoneSecondarySPJob(pieceJob) - assert.Equal(t, true, job.secondaryJob.Completed()) + assert.Equal(t, 2, len(job.secondaryJob.PopPendingJob())) + pieceJob.StorageProviderSealInfo.PieceIdx = 4 + job.DoneSecondarySPJob(pieceJob) + assert.Equal(t, 1, len(job.secondaryJob.PopPendingJob())) + pieceJob.StorageProviderSealInfo.PieceIdx = 5 + job.DoneSecondarySPJob(pieceJob) + //assert.Equal(t, true, job.secondaryJob.Completed()) } func TestDoneInlinePieceJob(t *testing.T) { @@ -103,10 +116,7 @@ func TestDoneInlinePieceJob(t *testing.T) { assert.Equal(t, 0, len(job.primaryJob.PopPendingJob())) pieceJob.StorageProviderSealInfo.PieceIdx = 0 job.DoneSecondarySPJob(pieceJob) - assert.Equal(t, pieceCheckSum, job.secondaryJob.pieceJobs[0].Checksum[0]) - assert.Equal(t, intergrity, job.secondaryJob.pieceJobs[0].IntegrityHash) - assert.Equal(t, signature, job.secondaryJob.pieceJobs[0].Signature) - assert.Equal(t, 0, len(job.secondaryJob.PopPendingJob())) + assert.Equal(t, 5, len(job.secondaryJob.PopPendingJob())) } func TestDoneECPieceJob(t *testing.T) { diff --git a/pkg/job/upload_payload_job.go b/pkg/job/upload_payload_job.go index 79b7460c6..365614b9b 100644 --- a/pkg/job/upload_payload_job.go +++ b/pkg/job/upload_payload_job.go @@ -9,8 +9,8 @@ import ( // UploadPayloadJob maintains the object info and piece job meta type UploadPayloadJob struct { objectCtx *ObjectInfoContext - primaryJob *UploadSpJob // the job of uploading primary storage provider - secondaryJob *UploadSpJob // the job of uploading secondary storage provider + primaryJob PieceJob // the job of uploading primary storage provider + secondaryJob PieceJob // the job of uploading secondary storage provider } // NewUploadPayloadJob return the instance of UploadPayloadJob. @@ -18,20 +18,20 @@ func NewUploadPayloadJob(objectCtx *ObjectInfoContext) (job *UploadPayloadJob, e job = &UploadPayloadJob{ objectCtx: objectCtx, } - if job.primaryJob, err = NewSegmentUploadSpJob(objectCtx, false); err != nil { + if job.primaryJob, err = NewPrimaryJob(objectCtx); err != nil { return nil, err } switch objectCtx.GetObjectRedundancyType() { case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: - if job.secondaryJob, err = NewECUploadSpJob(objectCtx, true); err != nil { + if job.secondaryJob, err = NewSecondaryECPieceJob(objectCtx); err != nil { return nil, err } case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: - if job.secondaryJob, err = NewSegmentUploadSpJob(objectCtx, true); err != nil { + if job.secondaryJob, err = NewSecondarySegmentPieceJob(objectCtx); err != nil { return nil, err } case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE: - if job.secondaryJob, err = NewSegmentUploadSpJob(objectCtx, true); err != nil { + if job.secondaryJob, err = NewSecondarySegmentPieceJob(objectCtx); err != nil { return nil, err } default: diff --git a/pkg/job/upload_piece_job.go b/pkg/job/upload_piece_job.go index af4efc9a5..0077bfb61 100644 --- a/pkg/job/upload_piece_job.go +++ b/pkg/job/upload_piece_job.go @@ -13,30 +13,35 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/util/log" ) -// UploadSpJob stands one upload job for the one storage provider. -type UploadSpJob struct { - objectCtx *ObjectInfoContext - pieceJobs []*jobdb.PieceJob - pieceType model.PieceType - redundancy ptypesv1pb.RedundancyType - secondary bool - complete int - mu sync.RWMutex +type PieceJob interface { + // Completed return whether all piece jobs is completed. + Completed() bool + // PopPendingJob return the uncompleted piece jobs. + PopPendingJob() (pieceIdx []uint32) + // Done completed one piece job and store the state to DB. + Done(pieceJob *stypesv1pb.PieceJob) error + // SealInfo return the storage provider info for seal. + SealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) } -func NewSegmentUploadSpJob(objectCtx *ObjectInfoContext, secondary bool) (*UploadSpJob, error) { - job := &UploadSpJob{ - objectCtx: objectCtx, - secondary: secondary, - pieceType: model.SegmentPieceType, - redundancy: objectCtx.GetObjectRedundancyType(), - } - pieceCount := util.ComputeSegmentCount(objectCtx.GetObjectSize()) - if job.secondary && job.redundancy == ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE { - pieceCount = model.EC_M + model.EC_K - } - for idx := 0; idx < int(pieceCount); idx++ { - job.pieceJobs = append(job.pieceJobs, &jobdb.PieceJob{ +var _ PieceJob = &PrimaryJob{} + +// PrimaryJob maintains primary segment piece job info. +type PrimaryJob struct { + objectCtx *ObjectInfoContext + segmentPieceJobs []*jobdb.PieceJob + complete int + mu sync.RWMutex +} + +// NewPrimaryJob return the PrimaryJob instance. +func NewPrimaryJob(objectCtx *ObjectInfoContext) (*PrimaryJob, error) { + job := &PrimaryJob{ + objectCtx: objectCtx, + } + segmentCount := util.ComputeSegmentCount(objectCtx.GetObjectSize()) + for idx := 0; idx < int(segmentCount); idx++ { + job.segmentPieceJobs = append(job.segmentPieceJobs, &jobdb.PieceJob{ PieceId: uint32(idx), Checksum: make([][]byte, 1), IntegrityHash: make([]byte, hash.LengthHash), @@ -48,27 +53,126 @@ func NewSegmentUploadSpJob(objectCtx *ObjectInfoContext, secondary bool) (*Uploa return nil, err } for _, pieceJob := range pieceJobs { - if pieceJob.PieceId >= pieceCount { + if pieceJob.PieceId >= segmentCount { return nil, merrors.ErrIndexOutOfBounds } - job.pieceJobs[pieceJob.PieceId] = pieceJob + job.segmentPieceJobs[pieceJob.PieceId] = pieceJob } return job, nil } -func NewECUploadSpJob(objectCtx *ObjectInfoContext, secondary bool) (*UploadSpJob, error) { - job := &UploadSpJob{ - objectCtx: objectCtx, - secondary: secondary, - pieceType: model.ECPieceType, - redundancy: objectCtx.GetObjectRedundancyType(), - } - pieceCount := model.EC_M + model.EC_K - segmentCount := util.ComputeSegmentCount(objectCtx.GetObjectSize()) - for pieceIdx := 0; pieceIdx < int(pieceCount); pieceIdx++ { - job.pieceJobs = append(job.pieceJobs, &jobdb.PieceJob{ - PieceId: uint32(pieceIdx), - Checksum: make([][]byte, segmentCount), +// Completed return whether all segment piece jobs of the primary sp are completed. +func (job *PrimaryJob) Completed() bool { + job.mu.RLock() + defer job.mu.RUnlock() + return job.complete == len(job.segmentPieceJobs) +} + +// PopPendingJob return the uncompleted segment piece jobs of the primary sp. +func (job *PrimaryJob) PopPendingJob() (pieceIdx []uint32) { + job.mu.RLock() + defer job.mu.RUnlock() + if job.complete == len(job.segmentPieceJobs) { + return pieceIdx + } + for i, pieceJob := range job.segmentPieceJobs { + if pieceJob.Done { + continue + } + pieceIdx = append(pieceIdx, uint32(i)) + } + return pieceIdx +} + +// Done complete one segment piece job of the primary sp. +func (job *PrimaryJob) Done(remoteJob *stypesv1pb.PieceJob) error { + job.mu.Lock() + defer job.mu.Unlock() + var ( + pieceIdx = remoteJob.GetStorageProviderSealInfo().GetPieceIdx() + objectID = remoteJob.GetObjectId() + ) + // 1. check job completed + if job.complete == len(job.segmentPieceJobs) { + log.Warnw("all primary piece jobs completed", "object_id", objectID, "piece_idx", pieceIdx) + return nil + } + // 2. get local piece job + if pieceIdx >= uint32(len(job.segmentPieceJobs)) { + return merrors.ErrIndexOutOfBounds + } + localJob := job.segmentPieceJobs[pieceIdx] + // 3. check piece job completed + if localJob.Done { + log.Warnw("piece job completed", "object_id", objectID, "piece_idx", pieceIdx) + return nil + } + // 4. check piece job info + if checksumCount := len(remoteJob.GetStorageProviderSealInfo().GetPieceChecksum()); checksumCount != 1 { + log.Errorw("done primary segment piece error", "object_id", objectID, "piece_idx", pieceIdx, + "want 1, checksum_count", checksumCount, "error", merrors.ErrCheckSumCountMismatch) + return merrors.ErrCheckSumCountMismatch + } + if checksumLength := len(remoteJob.GetStorageProviderSealInfo().GetPieceChecksum()[0]); checksumLength != hash.LengthHash { + log.Errorw("done primary segment piece error", "object_id", objectID, "piece_idx", pieceIdx, + "piece_checksum_length", checksumLength, "error", merrors.ErrCheckSumLengthMismatch) + return merrors.ErrCheckSumLengthMismatch + } + // 5. update piece state + localJob.Checksum = remoteJob.GetStorageProviderSealInfo().GetPieceChecksum() + localJob.StorageProvider = remoteJob.GetStorageProviderSealInfo().GetStorageProviderId() + localJob.Done = true + if err := job.objectCtx.SetPrimaryPieceJobDone(localJob); err != nil { + log.Errorw("write primary piece to db error", "object_id", objectID, "piece_idx", pieceIdx, "error", err) + localJob.Done = false + return err + } + job.complete++ + return nil +} + +// SealInfo return seal info of the primary sp. +func (job *PrimaryJob) SealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) { + job.mu.RLock() + defer job.mu.RUnlock() + var sealInfo []*ptypesv1pb.StorageProviderInfo + if job.complete != len(job.segmentPieceJobs) { + return sealInfo, merrors.ErrSpJobNotCompleted + } + var checksumList [][]byte + for _, pieceJob := range job.segmentPieceJobs { + checksumList = append(checksumList, pieceJob.Checksum[0]) + } + // TODO:: sign the primary integrity hash in stone hub level. + sealInfo = append(sealInfo, &ptypesv1pb.StorageProviderInfo{ + SpId: job.segmentPieceJobs[0].StorageProvider, + Checksum: hash.GenerateIntegrityHash(checksumList), + }) + return sealInfo, nil +} + +var _ PieceJob = &SecondarySegmentPieceJob{} + +// SecondarySegmentPieceJob maintains secondary segment piece job info. +// use for INLINE redundancy type and REPLICA redundancy type. +type SecondarySegmentPieceJob struct { + objectCtx *ObjectInfoContext + copyPieceJobs []*jobdb.PieceJob + complete int + mu sync.RWMutex +} + +// NewSecondarySegmentPieceJob return the SecondarySegmentPieceJob instance. +func NewSecondarySegmentPieceJob(objectCtx *ObjectInfoContext) (*SecondarySegmentPieceJob, error) { + job := &SecondarySegmentPieceJob{ + objectCtx: objectCtx, + } + // TODO:: the number of copy is configurable + copyCount := model.EC_M + model.EC_K + for idx := 0; idx < copyCount; idx++ { + job.copyPieceJobs = append(job.copyPieceJobs, &jobdb.PieceJob{ + PieceId: uint32(idx), + Checksum: make([][]byte, 1), IntegrityHash: make([]byte, hash.LengthHash), Signature: make([]byte, hash.LengthHash), }) @@ -78,29 +182,29 @@ func NewECUploadSpJob(objectCtx *ObjectInfoContext, secondary bool) (*UploadSpJo return nil, err } for _, pieceJob := range pieceJobs { - if pieceJob.PieceId >= segmentCount { + if pieceJob.PieceId >= uint32(copyCount) { return nil, merrors.ErrIndexOutOfBounds } - job.pieceJobs[pieceJob.PieceId] = pieceJob + job.copyPieceJobs[pieceJob.PieceId] = pieceJob } return job, nil } -// Completed whether upload job is completed. -func (job *UploadSpJob) Completed() bool { +// Completed return whether all copy piece jobs of the secondary sp are completed. +func (job *SecondarySegmentPieceJob) Completed() bool { job.mu.RLock() defer job.mu.RUnlock() - return job.complete == len(job.pieceJobs) + return job.complete == len(job.copyPieceJobs) } -// PopPendingJob return the uncompleted piece jobs. -func (job *UploadSpJob) PopPendingJob() (pieceIdx []uint32) { +// PopPendingJob return the uncompleted copy piece jobs of the secondary sp. +func (job *SecondarySegmentPieceJob) PopPendingJob() (pieceIdx []uint32) { job.mu.RLock() defer job.mu.RUnlock() - if job.complete == len(job.pieceJobs) { + if job.complete == len(job.copyPieceJobs) { return pieceIdx } - for i, pieceJob := range job.pieceJobs { + for i, pieceJob := range job.copyPieceJobs { if pieceJob.Done { continue } @@ -109,126 +213,193 @@ func (job *UploadSpJob) PopPendingJob() (pieceIdx []uint32) { return pieceIdx } -// Done completed one piece job and store the state to DB. -func (job *UploadSpJob) Done(pieceJob *stypesv1pb.PieceJob) error { +// Done complete one copy piece job of the secondary sp. +func (job *SecondarySegmentPieceJob) Done(remoteJob *stypesv1pb.PieceJob) error { job.mu.Lock() defer job.mu.Unlock() - // 1. check job weather has completed - if job.complete == len(job.pieceJobs) { - log.Warnw("upload storage provider already completed", "object_id", pieceJob.GetObjectId(), "secondary_sp", job.secondary, - "piece_idx", pieceJob.GetStorageProviderSealInfo().GetPieceIdx(), "piece_type", job.pieceType) + var ( + copyIdx = remoteJob.GetStorageProviderSealInfo().GetPieceIdx() + objectID = remoteJob.GetObjectId() + ) + // 1. check job completed + if job.complete == len(job.copyPieceJobs) { + log.Warnw("all secondary piece jobs completed", "object_id", objectID, "copy_idx", copyIdx) return nil } - // 2. get piece job - pieceIdx := pieceJob.GetStorageProviderSealInfo().GetPieceIdx() - if pieceIdx >= uint32(len(job.pieceJobs)) { + // 2. get local piece job + if copyIdx >= uint32(len(job.copyPieceJobs)) { return merrors.ErrIndexOutOfBounds } - piece := job.pieceJobs[pieceIdx] - // 3. check piece job weather has completed - if piece.Done { - log.Warnw("piece job already completed", "object_id", pieceJob.GetObjectId(), "secondary_sp", job.secondary, - "piece_idx", pieceJob.GetStorageProviderSealInfo().GetPieceIdx(), "piece_type", job.pieceType) + localJob := job.copyPieceJobs[copyIdx] + // 3. check piece job completed + if localJob.Done { + log.Warnw("piece job completed", "object_id", objectID, "copy_idx", copyIdx) return nil } - // 4. update piece state - var err error - if job.pieceType == model.SegmentPieceType { - err = job.doneSegment(piece, pieceJob) - } else { - err = job.doneEC(piece, pieceJob) + // 4. check piece job info + var ( + actualPieceCount = uint32(len(remoteJob.GetStorageProviderSealInfo().GetPieceChecksum())) + expectedPieceCount = util.ComputeSegmentCount(job.objectCtx.GetObjectSize()) + ) + if actualPieceCount != expectedPieceCount { + log.Errorw("done secondary segment piece error", "object_id", objectID, "copy_idx", copyIdx, + "expected_piece_count", expectedPieceCount, "actual_piece_count", actualPieceCount, "error", merrors.ErrCheckSumCountMismatch) + return merrors.ErrCheckSumCountMismatch } - return err -} - -// donePrimary update primary piece job state, include memory and db. -func (job *UploadSpJob) doneSegment(segmentPiece *jobdb.PieceJob, pieceJob *stypesv1pb.PieceJob) error { - pieceCount := uint32(len(pieceJob.GetStorageProviderSealInfo().GetPieceChecksum())) - segmentCount := util.ComputeSegmentCount(job.objectCtx.GetObjectSize()) - // primary segment - if !job.secondary { - if pieceCount != 1 { - log.Errorw("done segment piece error", "object_id", pieceJob.GetObjectId(), "second", job.secondary, - "piece_idx", segmentPiece.PieceId, "want 1, checksum_count", pieceCount, "error", merrors.ErrCheckSumCountMismatch) - return merrors.ErrCheckSumCountMismatch - } - pieceCheckSumLen := len(pieceJob.GetStorageProviderSealInfo().GetPieceChecksum()[0]) - if pieceCheckSumLen != hash.LengthHash { - log.Errorw("done segment piece error", "object_id", pieceJob.GetObjectId(), "second", job.secondary, - "piece_idx", segmentPiece.PieceId, "piece_checksum_length", pieceCheckSumLen, "error", merrors.ErrCheckSumLengthMismatch) + for idx, checkSum := range remoteJob.GetStorageProviderSealInfo().GetPieceChecksum() { + if len(checkSum) != hash.LengthHash { + log.Errorw("done secondary segment piece error", "object_id", objectID, "copy_idx", copyIdx, + "segment_idx", idx, "segment_check_sum_length", len(checkSum), "error", merrors.ErrCheckSumLengthMismatch) return merrors.ErrCheckSumLengthMismatch } - } else { - // secondary replicate or inline redundancy type - if pieceCount != segmentCount { - log.Errorw("done ec piece error", "object_id", pieceJob.GetObjectId(), "piece_idx", segmentPiece.PieceId, - "want_count", segmentCount, "piece_count", pieceCount, "error", merrors.ErrCheckSumCountMismatch) - return merrors.ErrCheckSumCountMismatch - } - for idx, checkSum := range pieceJob.GetStorageProviderSealInfo().GetPieceChecksum() { - if len(checkSum) != hash.LengthHash { - log.Errorw("done ec piece error", "object_id", pieceJob.GetObjectId(), "piece_idx", segmentPiece.PieceId, - "checksum_idx", idx, "piece_check_sum_lengeth", len(checkSum), "error", merrors.ErrCheckSumLengthMismatch) - return merrors.ErrCheckSumLengthMismatch - } - } } - if job.secondary { - integrityHashLen := len(pieceJob.GetStorageProviderSealInfo().GetIntegrityHash()) - if integrityHashLen != hash.LengthHash { - log.Errorw("done segment piece error", "object_id", pieceJob.GetObjectId(), "second", job.secondary, - "piece_idx", segmentPiece.PieceId, "integrity_hash_length", integrityHashLen, "error", merrors.ErrIntegrityHashLengthMismatch) - return merrors.ErrIntegrityHashLengthMismatch - } - // TODO:: currrent signer service is not completed - //if len(pieceJob.GetStorageProviderSealInfo().GetSignature()) != hash.LengthHash { - // log.Errorw("done segment piece error", "object_id", pieceJob.GetObjectId(), - // "second", job.secondary, "piece_idx", segmentPiece.PieceId, "error", merrors.ErrSignatureLengthMismatch) - // return merrors.ErrSignatureLengthMismatch - //} - segmentPiece.IntegrityHash = pieceJob.GetStorageProviderSealInfo().GetIntegrityHash() - segmentPiece.Signature = pieceJob.GetStorageProviderSealInfo().GetSignature() - } - segmentPiece.Checksum = pieceJob.GetStorageProviderSealInfo().GetPieceChecksum() - segmentPiece.StorageProvider = pieceJob.GetStorageProviderSealInfo().GetStorageProviderId() - segmentPiece.Done = true - if job.secondary { - if err := job.objectCtx.SetSecondaryPieceJobDone(segmentPiece); err != nil { - log.Errorw("write secondary piece to db error", "object_id", pieceJob.GetObjectId(), - "piece_idx", segmentPiece.PieceId, "error", err) - return err - } - } else { - if err := job.objectCtx.SetPrimaryPieceJobDone(segmentPiece); err != nil { - log.Errorw("write primary piece to db error", "object_id", pieceJob.GetObjectId(), - "piece_idx", segmentPiece.PieceId, "error", err) - return err - } + if integrityHashLength := len(remoteJob.GetStorageProviderSealInfo().GetIntegrityHash()); integrityHashLength != hash.LengthHash { + log.Errorw("done secondary segment piece error", "object_id", objectID, "copy_idx", copyIdx, + "integrity_hash_length", integrityHashLength, "error", merrors.ErrIntegrityHashLengthMismatch) + return merrors.ErrIntegrityHashLengthMismatch + } + // TODO:: currrent signer service is not completed + //if len(pieceJob.GetStorageProviderSealInfo().GetSignature()) != hash.LengthHash { + // log.Errorw("done segment piece error", "object_id", pieceJob.GetObjectId(), + // "second", job.secondary, "piece_idx", segmentPiece.PieceId, "error", merrors.ErrSignatureLengthMismatch) + // return merrors.ErrSignatureLengthMismatch + //} + localJob.IntegrityHash = remoteJob.GetStorageProviderSealInfo().GetIntegrityHash() + localJob.Signature = remoteJob.GetStorageProviderSealInfo().GetSignature() + localJob.Checksum = remoteJob.GetStorageProviderSealInfo().GetPieceChecksum() + localJob.StorageProvider = remoteJob.GetStorageProviderSealInfo().GetStorageProviderId() + localJob.Done = true + if err := job.objectCtx.SetSecondaryPieceJobDone(localJob); err != nil { + log.Errorw("write secondary piece to db error", "object_id", objectID, "copy_idx", copyIdx, "error", err) + localJob.Done = false + return err } job.complete++ return nil } -// doneSecondary update primary piece job state, include memory and db. -func (job *UploadSpJob) doneEC(ecPiece *jobdb.PieceJob, pieceJob *stypesv1pb.PieceJob) error { - pieceCount := uint32(len(pieceJob.GetStorageProviderSealInfo().GetPieceChecksum())) - segmentCount := util.ComputeSegmentCount(job.objectCtx.GetObjectSize()) - if pieceCount != segmentCount { - log.Errorw("done ec piece error", "object_id", pieceJob.GetObjectId(), "piece_idx", ecPiece.PieceId, - "want_count", segmentCount, "piece_count", pieceCount, "error", merrors.ErrCheckSumCountMismatch) +// SealInfo return seal info of the secondary sp. +func (job *SecondarySegmentPieceJob) SealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) { + job.mu.RLock() + defer job.mu.RUnlock() + var sealInfo []*ptypesv1pb.StorageProviderInfo + if job.complete != len(job.copyPieceJobs) { + return sealInfo, merrors.ErrSpJobNotCompleted + } + for _, pieceJob := range job.copyPieceJobs { + sealInfo = append(sealInfo, &ptypesv1pb.StorageProviderInfo{ + SpId: pieceJob.StorageProvider, + Idx: pieceJob.PieceId, + Checksum: pieceJob.IntegrityHash, + Signature: pieceJob.Signature, + }) + } + return sealInfo, nil +} + +var _ PieceJob = &SecondaryECPieceJob{} + +// SecondaryECPieceJob maintains secondary ec piece job info. +type SecondaryECPieceJob struct { + objectCtx *ObjectInfoContext + ecPieceJobs []*jobdb.PieceJob + complete int + mu sync.RWMutex +} + +// NewSecondaryECPieceJob return the SecondaryECPieceJob instance. +func NewSecondaryECPieceJob(objectCtx *ObjectInfoContext) (*SecondaryECPieceJob, error) { + job := &SecondaryECPieceJob{ + objectCtx: objectCtx, + } + pieceJobCount := model.EC_M + model.EC_K + for idx := 0; idx < pieceJobCount; idx++ { + job.ecPieceJobs = append(job.ecPieceJobs, &jobdb.PieceJob{ + PieceId: uint32(idx), + Checksum: make([][]byte, 1), + IntegrityHash: make([]byte, hash.LengthHash), + Signature: make([]byte, hash.LengthHash), + }) + } + pieceJobs, err := job.objectCtx.getSecondaryJob() + if err != nil { + return nil, err + } + for _, pieceJob := range pieceJobs { + if pieceJob.PieceId >= uint32(pieceJobCount) { + return nil, merrors.ErrIndexOutOfBounds + } + job.ecPieceJobs[pieceJob.PieceId] = pieceJob + } + return job, nil +} + +// Completed return whether all ec piece jobs of the secondary sp are completed. +func (job *SecondaryECPieceJob) Completed() bool { + job.mu.RLock() + defer job.mu.RUnlock() + return job.complete == len(job.ecPieceJobs) +} + +// PopPendingJob return the uncompleted ec piece jobs of the secondary sp. +func (job *SecondaryECPieceJob) PopPendingJob() (pieceIdx []uint32) { + job.mu.RLock() + defer job.mu.RUnlock() + if job.complete == len(job.ecPieceJobs) { + return pieceIdx + } + for i, pieceJob := range job.ecPieceJobs { + if pieceJob.Done { + continue + } + pieceIdx = append(pieceIdx, uint32(i)) + } + return pieceIdx +} + +// Done complete one ec piece job of the secondary sp. +func (job *SecondaryECPieceJob) Done(remoteJob *stypesv1pb.PieceJob) error { + job.mu.Lock() + defer job.mu.Unlock() + var ( + pieceIdx = remoteJob.GetStorageProviderSealInfo().GetPieceIdx() + objectID = remoteJob.GetObjectId() + ) + // 1. check job completed + if job.complete == len(job.ecPieceJobs) { + log.Warnw("all secondary piece jobs completed", "object_id", objectID, "piece_idx", pieceIdx) + return nil + } + // 2. get local piece job + if pieceIdx >= uint32(len(job.ecPieceJobs)) { + return merrors.ErrIndexOutOfBounds + } + localJob := job.ecPieceJobs[pieceIdx] + // 3. check piece job completed + if localJob.Done { + log.Warnw("piece job completed", "object_id", objectID, "piece_idx", pieceIdx) + return nil + } + // 4. check piece job info + var ( + actualPieceCount = uint32(len(remoteJob.GetStorageProviderSealInfo().GetPieceChecksum())) + expectedPieceCount = util.ComputeSegmentCount(job.objectCtx.GetObjectSize()) + ) + if actualPieceCount != expectedPieceCount { + log.Errorw("done ec piece error", "object_id", objectID, "ec_piece_idx", pieceIdx, + "expected_piece_count", expectedPieceCount, "actual_piece_count", actualPieceCount, "error", merrors.ErrCheckSumCountMismatch) return merrors.ErrCheckSumCountMismatch } - for idx, checkSum := range pieceJob.GetStorageProviderSealInfo().GetPieceChecksum() { + for idx, checkSum := range remoteJob.GetStorageProviderSealInfo().GetPieceChecksum() { if len(checkSum) != hash.LengthHash { - log.Errorw("done ec piece error", "object_id", pieceJob.GetObjectId(), "piece_idx", ecPiece.PieceId, - "checksum_idx", idx, "piece_check_sum_lengeth", len(checkSum), "error", merrors.ErrCheckSumLengthMismatch) + log.Errorw("done ec piece error", "object_id", objectID, "ec_idx", pieceIdx, "piece_idx", idx, + "piece_check_sum_length", len(checkSum), "error", merrors.ErrCheckSumLengthMismatch) return merrors.ErrCheckSumLengthMismatch } } - integrityHashLen := len(pieceJob.GetStorageProviderSealInfo().GetIntegrityHash()) - if integrityHashLen != hash.LengthHash { - log.Errorw("done ec piece error", "object_id", pieceJob.GetObjectId(), "piece_idx", ecPiece.PieceId, - "integrity_hash_length", integrityHashLen, "error", merrors.ErrIntegrityHashLengthMismatch) + if integrityHashLength := len(remoteJob.GetStorageProviderSealInfo().GetIntegrityHash()); integrityHashLength != hash.LengthHash { + log.Errorw("done ec piece error", "object_id", objectID, "ec_idx", pieceIdx, + "integrity_hash_length", integrityHashLength, "error", merrors.ErrIntegrityHashLengthMismatch) return merrors.ErrIntegrityHashLengthMismatch } // TODO:: currrent signer service is not completed @@ -237,52 +408,29 @@ func (job *UploadSpJob) doneEC(ecPiece *jobdb.PieceJob, pieceJob *stypesv1pb.Pie // "piece_idx", ecPiece.PieceId, "error", merrors.ErrSignatureLengthMismatch) // return merrors.ErrSignatureLengthMismatch //} - ecPiece.Checksum = pieceJob.GetStorageProviderSealInfo().GetPieceChecksum() - ecPiece.IntegrityHash = pieceJob.GetStorageProviderSealInfo().GetIntegrityHash() - ecPiece.Signature = pieceJob.GetStorageProviderSealInfo().GetSignature() - ecPiece.StorageProvider = pieceJob.GetStorageProviderSealInfo().GetStorageProviderId() - ecPiece.Done = true - if err := job.objectCtx.SetSecondaryPieceJobDone(ecPiece); err != nil { + localJob.Checksum = remoteJob.GetStorageProviderSealInfo().GetPieceChecksum() + localJob.IntegrityHash = remoteJob.GetStorageProviderSealInfo().GetIntegrityHash() + localJob.Signature = remoteJob.GetStorageProviderSealInfo().GetSignature() + localJob.StorageProvider = remoteJob.GetStorageProviderSealInfo().GetStorageProviderId() + localJob.Done = true + if err := job.objectCtx.SetSecondaryPieceJobDone(localJob); err != nil { log.Infow("set secondary piece job to db error", "error", err) + localJob.Done = false return err } job.complete++ return nil } -// SealInfo return the info for seal. -func (job *UploadSpJob) SealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) { +// SealInfo return seal info of the secondary sp. +func (job *SecondaryECPieceJob) SealInfo() ([]*ptypesv1pb.StorageProviderInfo, error) { job.mu.RLock() defer job.mu.RUnlock() - if job.complete != len(job.pieceJobs) { - return nil, merrors.ErrSpJobNotCompleted - } var sealInfo []*ptypesv1pb.StorageProviderInfo - if job.secondary { - sealInfo = job.sealSecondary() - } else { - sealInfo = append(sealInfo, job.sealPrimary()) - } - return sealInfo, nil -} - -// PrimarySealInfo compute the primary integrity hash. -func (job *UploadSpJob) sealPrimary() *ptypesv1pb.StorageProviderInfo { - var checksumList [][]byte - for _, pieceJob := range job.pieceJobs { - checksumList = append(checksumList, pieceJob.Checksum[0]) - } - // TODO:: sign the primary integrity hash in stone hub level. - return &ptypesv1pb.StorageProviderInfo{ - SpId: job.pieceJobs[0].StorageProvider, - Checksum: hash.GenerateIntegrityHash(checksumList), + if job.complete != len(job.ecPieceJobs) { + return sealInfo, merrors.ErrSpJobNotCompleted } -} - -// SecondarySealInfo return secondary info for seal, the stone node service report. -func (job *UploadSpJob) sealSecondary() []*ptypesv1pb.StorageProviderInfo { - var sealInfo []*ptypesv1pb.StorageProviderInfo - for _, pieceJob := range job.pieceJobs { + for _, pieceJob := range job.ecPieceJobs { sealInfo = append(sealInfo, &ptypesv1pb.StorageProviderInfo{ SpId: pieceJob.StorageProvider, Idx: pieceJob.PieceId, @@ -290,5 +438,5 @@ func (job *UploadSpJob) sealSecondary() []*ptypesv1pb.StorageProviderInfo { Signature: pieceJob.Signature, }) } - return sealInfo + return sealInfo, nil } diff --git a/pkg/stone/stone_job_context.go b/pkg/stone/stone_job_context.go index edbddd0fc..01e2f3d1b 100644 --- a/pkg/stone/stone_job_context.go +++ b/pkg/stone/stone_job_context.go @@ -14,13 +14,13 @@ import ( type JobContextWrapper struct { jobCtx *ptypesv1pb.JobContext jobErr error - jobDB jobdb.JobDB + jobDB jobdb.JobDBV2 metaDB metadb.MetaDB mu sync.RWMutex } // NewJobContextWrapper return the instance of JobContextWrapper -func NewJobContextWrapper(jobCtx *ptypesv1pb.JobContext, jobDB jobdb.JobDB, metaDB metadb.MetaDB) *JobContextWrapper { +func NewJobContextWrapper(jobCtx *ptypesv1pb.JobContext, jobDB jobdb.JobDBV2, metaDB metadb.MetaDB) *JobContextWrapper { return &JobContextWrapper{ jobCtx: jobCtx, jobDB: jobDB, @@ -44,7 +44,7 @@ func (wrapper *JobContextWrapper) SetJobState(state string) error { wrapper.mu.Lock() defer wrapper.mu.Unlock() wrapper.jobCtx.JobState = ptypesv1pb.JobState(ptypesv1pb.JobState_value[state]) - return wrapper.jobDB.SetUploadPayloadJobState(wrapper.jobCtx.JobId, state, time.Now().Unix()) + return wrapper.jobDB.SetUploadPayloadJobStateV2(wrapper.jobCtx.JobId, state, time.Now().Unix()) } // JobErr return job error @@ -65,7 +65,7 @@ func (wrapper *JobContextWrapper) SetJobErr(err error) error { wrapper.jobCtx.JobErr = wrapper.jobCtx.JobErr + err.Error() } wrapper.jobCtx.JobState = ptypesv1pb.JobState_JOB_STATE_ERROR - return wrapper.jobDB.SetUploadPayloadJobJobError(wrapper.jobCtx.JobId, + return wrapper.jobDB.SetUploadPayloadJobJobErrorV2(wrapper.jobCtx.JobId, ptypesv1pb.JOB_STATE_ERROR, wrapper.jobCtx.JobErr, time.Now().Unix()) } diff --git a/pkg/stone/upload_payload_stone.go b/pkg/stone/upload_payload_stone.go index 04be22a6f..242e9f51e 100644 --- a/pkg/stone/upload_payload_stone.go +++ b/pkg/stone/upload_payload_stone.go @@ -32,14 +32,14 @@ type UploadPayloadStone struct { job *job.UploadPayloadJob // records the upload payload job information jobCh chan StoneJob // the channel of transfer job to StoneHub gcCh chan uint64 // the channel of notify StoneHub to delete stone - jobDB jobdb.JobDB + jobDB jobdb.JobDBV2 metaDB metadb.MetaDB } // NewUploadPayloadStone return the instance of UploadPayloadStone func NewUploadPayloadStone(ctx context.Context, jobContext *ptypesv1pb.JobContext, object *ptypesv1pb.ObjectInfo, - jobDB jobdb.JobDB, metaDB metadb.MetaDB, + jobDB jobdb.JobDBV2, metaDB metadb.MetaDB, jobCh chan StoneJob, gcCh chan uint64) (*UploadPayloadStone, error) { jobCtx := NewJobContextWrapper(jobContext, jobDB, metaDB) objectCtx := job.NewObjectInfoContext(object, jobDB, metaDB) diff --git a/pkg/stone/upload_payload_stone_test.go b/pkg/stone/upload_payload_stone_test.go index 9645c5827..0672f07bf 100644 --- a/pkg/stone/upload_payload_stone_test.go +++ b/pkg/stone/upload_payload_stone_test.go @@ -37,15 +37,12 @@ func InitENV() (*UploadPayloadStone, error) { SpId: "bnb-test-sp", }, } - jobDB := jobmemory.NewMemJobDB() - if _, err := jobDB.CreateUploadPayloadJob(txHash, object); err != nil { - return nil, err - } - if err := jobDB.SetObjectCreateHeightAndObjectID(txHash, height, objectID); err != nil { + jobDB := jobmemory.NewMemJobDBV2() + jobID, err := jobDB.CreateUploadPayloadJobV2(object) + if err != nil { return nil, err } - jobID := jobDB.JobCount - 1 - jobCtx, err := jobDB.GetJobContext(jobID) + jobCtx, err := jobDB.GetJobContextV2(jobID) if err != nil { return nil, err } diff --git a/proto/service/types/v1/stone_hub.proto b/proto/service/types/v1/stone_hub.proto index d8f79c3c6..0602e5066 100644 --- a/proto/service/types/v1/stone_hub.proto +++ b/proto/service/types/v1/stone_hub.proto @@ -37,17 +37,6 @@ message StorageProviderSealInfo { bytes signature = 5; } -message PieceJob { -// string bucket_name = 1; -// string object_name = 2; - bytes tx_hash = 3; - uint64 object_id = 4; - uint64 payload_size = 5; - repeated uint32 target_idx = 6; // ec number or segment number: 0, 1, 2..., start at 0 - pkg.types.v1.RedundancyType redundancy_type = 7; - StorageProviderSealInfo storage_provider_seal_info = 8; -} - message StoneHubServiceBeginUploadPayloadRequest { string trace_id = 1; bytes tx_hash = 2; @@ -60,9 +49,22 @@ message StoneHubServiceBeginUploadPayloadResponse { ErrMessage err_message = 4; } +message PieceJob { +// string bucket_name = 1; +// string object_name = 2; +// bytes tx_hash = 3; + uint64 object_id = 4; + uint64 payload_size = 5; + repeated uint32 target_idx = 6; // ec number or segment number: 0, 1, 2..., start at 0 + pkg.types.v1.RedundancyType redundancy_type = 7; + StorageProviderSealInfo storage_provider_seal_info = 8; +} + + + message StoneHubServiceBeginUploadPayloadV2Request { string trace_id = 1; - bytes tx_hash = 2; +// bytes tx_hash = 2; pkg.types.v1.ObjectInfo object_info = 3; } @@ -75,14 +77,14 @@ message StoneHubServiceBeginUploadPayloadV2Response { message StoneHubServiceDonePrimaryPieceJobRequest { string trace_id = 1; - bytes tx_hash = 2; +// bytes tx_hash = 2; ErrMessage err_message = 3; PieceJob piece_job = 4; } message StoneHubServiceDonePrimaryPieceJobResponse { string trace_id = 1; - bytes tx_hash = 2; +// bytes tx_hash = 2; ErrMessage err_message = 3; } @@ -92,14 +94,14 @@ message StoneHubServiceAllocStoneJobRequest { message StoneHubServiceAllocStoneJobResponse { string trace_id = 1; - bytes tx_hash = 2; +// bytes tx_hash = 2; PieceJob piece_job = 3; ErrMessage err_message = 4; } message StoneHubServiceDoneSecondaryPieceJobRequest { string trace_id = 1; - bytes tx_hash = 2; +// bytes tx_hash = 2; PieceJob piece_job = 3; ErrMessage err_message = 4; } diff --git a/proto/service/types/v1/syncer.proto b/proto/service/types/v1/syncer.proto index 2e3472315..137728119 100644 --- a/proto/service/types/v1/syncer.proto +++ b/proto/service/types/v1/syncer.proto @@ -7,7 +7,7 @@ option go_package = "github.com/bnb-chain/greenfield-storage-provider/service/ty message SyncerInfo { uint64 object_id = 1; - bytes tx_hash = 2; +// bytes tx_hash = 2; string storage_provider_id = 3; uint32 piece_count = 4; pkg.types.v1.RedundancyType redundancy_type = 5; diff --git a/service/stonehub/stone_hub.go b/service/stonehub/stone_hub.go index a0d2a8984..4fc339fbf 100644 --- a/service/stonehub/stone_hub.go +++ b/service/stonehub/stone_hub.go @@ -50,7 +50,7 @@ var _ lifecycle.Service = &StoneHub{} // StoneHub manage all stones, the stone is an abstraction of job context and fsm. type StoneHub struct { config *StoneHubConfig - jobDB jobdb.JobDB // store the stones(include job and fsm) context + jobDB jobdb.JobDBV2 // store the stones(include job and fsm) context metaDB metadb.MetaDB // store the storage provider meta stone sync.Map // hold all the running stones, goroutine safe jobQueue *lane.Queue // hold the stones that wait to be requested by stone node service @@ -216,9 +216,8 @@ func (hub *StoneHub) gcMemoryStone() { return true // skip err stone } if val.LastModifyTime() <= current || state == ptypesv1pb.JOB_STATE_ERROR { - stoneKey := key.(string) - log.Infow("gc memory stone", "key", stoneKey) - hub.stone.Delete(stoneKey) + log.Infow("gc memory stone", "object_id", key) + hub.stone.Delete(key) } return true }) @@ -260,42 +259,40 @@ func (hub *StoneHub) listenChain() { // initDB init job, meta, etc. db instance func (hub *StoneHub) initDB() error { - initMemoryDB := func() { - hub.jobDB = jobmemory.NewMemJobDB() - } - initSqlDB := func() (err error) { - if hub.config.JobDB == nil { - hub.config.JobDB = DefaultStoneHubConfig.JobDB - } - hub.jobDB, err = jobsql.NewJobMetaImpl(hub.config.JobDB) - return - } - initLevelDB := func() (err error) { + initMetaDB := func() (err error) { if hub.config.MetaDB == nil { hub.config.MetaDB = DefaultStoneHubConfig.MetaDB } - hub.metaDB, err = leveldb.NewMetaDB(hub.config.MetaDB) + switch hub.config.MetaDBType { + case model.LevelDB: + hub.metaDB, err = leveldb.NewMetaDB(hub.config.MetaDB) + case model.MySqlDB: + // TODO:: meta support SQL + default: + err = fmt.Errorf("meta db not support %s type", hub.config.MetaDBType) + } return } - switch hub.config.JobDBType { - case model.MySqlDB: - if err := initSqlDB(); err != nil { - return err + initJobDB := func() (err error) { + if hub.config.JobDB == nil { + hub.config.JobDB = DefaultStoneHubConfig.JobDB } - case model.MemoryDB: - initMemoryDB() - default: - return fmt.Errorf("job db not support %s type", hub.config.JobDBType) - } - - switch hub.config.MetaDBType { - case model.LevelDB: - if err := initLevelDB(); err != nil { - return err + switch hub.config.JobDBType { + case model.MySqlDB: + hub.jobDB, err = jobsql.NewJobMetaImpl(hub.config.JobDB) + case model.MemoryDB: + hub.jobDB = jobmemory.NewMemJobDBV2() + default: + err = fmt.Errorf("job db not support %s type", hub.config.JobDBType) } - default: - return fmt.Errorf("meta db not support %s type", hub.config.MetaDBType) + return + } + if err := initMetaDB(); err != nil { + return err + } + if err := initJobDB(); err != nil { + return err } return nil } diff --git a/service/stonehub/stone_hub_service.go b/service/stonehub/stone_hub_service.go index 0662b1a14..2c2b7d41d 100644 --- a/service/stonehub/stone_hub_service.go +++ b/service/stonehub/stone_hub_service.go @@ -233,12 +233,11 @@ func (hub *StoneHub) BeginUploadPayloadV2(ctx context.Context, req *stypesv1pb.S uploadStone *stone.UploadPayloadStone ) // create upload stone - if req.ObjectInfo.JobId, err = hub.jobDB.CreateUploadPayloadJob( - req.GetObjectInfo().GetTxHash(), req.GetObjectInfo()); err != nil { + if req.ObjectInfo.JobId, err = hub.jobDB.CreateUploadPayloadJobV2(req.GetObjectInfo()); err != nil { return } // TODO::CreateUploadPayloadJob return jobContext - if jobCtx, err = hub.jobDB.GetJobContext(req.GetObjectInfo().GetJobId()); err != nil { + if jobCtx, err = hub.jobDB.GetJobContextV2(req.GetObjectInfo().GetJobId()); err != nil { return } if uploadStone, err = stone.NewUploadPayloadStone(ctx, jobCtx, req.GetObjectInfo(), @@ -261,7 +260,7 @@ func (hub *StoneHub) BeginUploadPayloadV2(ctx context.Context, req *stypesv1pb.S func (hub *StoneHub) DonePrimaryPieceJob(ctx context.Context, req *stypesv1pb.StoneHubServiceDonePrimaryPieceJobRequest) ( *stypesv1pb.StoneHubServiceDonePrimaryPieceJobResponse, error) { ctx = log.Context(ctx, req, req.GetPieceJob()) - resp := &stypesv1pb.StoneHubServiceDonePrimaryPieceJobResponse{TraceId: req.TraceId, TxHash: req.TxHash} + resp := &stypesv1pb.StoneHubServiceDonePrimaryPieceJobResponse{TraceId: req.TraceId} var ( uploadStone *stone.UploadPayloadStone job Stone @@ -407,6 +406,6 @@ func (hub *StoneHub) QueryStone(ctx context.Context, req *stypesv1pb.StoneHubSer rsp.JobInfo = uploadStone.GetJobContext() rsp.PendingPrimaryJob = uploadStone.PopPendingPrimarySPJob() rsp.PendingSecondaryJob = uploadStone.PopPendingSecondarySPJob() - rsp.ObjectInfo, _ = hub.jobDB.GetObjectInfo(uploadStone.GetObjectInfo().GetTxHash()) + rsp.ObjectInfo, _ = hub.jobDB.GetObjectInfoV2(uploadStone.GetObjectInfo().GetObjectId()) return rsp, nil } diff --git a/service/stonenode/server.go b/service/stonenode/server.go index e48b1e7ed..d008fd52d 100644 --- a/service/stonenode/server.go +++ b/service/stonenode/server.go @@ -134,6 +134,9 @@ func (node *StoneNodeService) allocStone(ctx context.Context) { resp, err := node.stoneHub.AllocStoneJob(ctx) ctx = log.Context(ctx, resp, resp.GetPieceJob()) if err != nil { + if err == merrors.ErrEmptyJob { + return + } log.CtxErrorw(ctx, "alloc stone from stone hub failed", "error", err) return } diff --git a/service/stonenode/sync.go b/service/stonenode/sync.go index e5b8df98f..47ce8231c 100644 --- a/service/stonenode/sync.go +++ b/service/stonenode/sync.go @@ -271,13 +271,11 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *sty objectID = resp.GetPieceJob().GetObjectId() payloadSize = resp.GetPieceJob().GetPayloadSize() redundancyType = resp.GetPieceJob().GetRedundancyType() - txHash = resp.GetTxHash() ) for secondary, pieceData := range pieceDataBySecondary { go func(secondary string, pieceData map[string][]byte) { errMsg := &stypesv1pb.ErrMessage{} pieceJob := &stypesv1pb.PieceJob{ - TxHash: txHash, ObjectId: objectID, PayloadSize: payloadSize, RedundancyType: redundancyType, @@ -287,7 +285,6 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *sty // notify stone hub when an ec segment is done req := &stypesv1pb.StoneHubServiceDoneSecondaryPieceJobRequest{ TraceId: resp.GetTraceId(), - TxHash: pieceJob.GetTxHash(), PieceJob: pieceJob, ErrMessage: errMsg, } @@ -300,7 +297,6 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *sty syncResp, err := node.syncPiece(ctx, &stypesv1pb.SyncerInfo{ ObjectId: objectID, - TxHash: txHash, StorageProviderId: secondary, PieceCount: uint32(len(pieceData)), RedundancyType: redundancyType, @@ -343,7 +339,6 @@ func (node *StoneNodeService) reportErrToStoneHub(ctx context.Context, resp *sty } req := &stypesv1pb.StoneHubServiceDoneSecondaryPieceJobRequest{ TraceId: resp.GetTraceId(), - TxHash: resp.GetTxHash(), ErrMessage: &stypesv1pb.ErrMessage{ ErrCode: stypesv1pb.ErrCode_ERR_CODE_ERROR, ErrMsg: reportErr.Error(), diff --git a/service/uploader/uploader_service.go b/service/uploader/uploader_service.go index ff453c46f..67d53ec24 100644 --- a/service/uploader/uploader_service.go +++ b/service/uploader/uploader_service.go @@ -223,7 +223,6 @@ func (uploader *Uploader) reportJobProgress(ctx context.Context, jm *JobMeta, up } req = &stypesv1pb.StoneHubServiceDonePrimaryPieceJobRequest{ TraceId: traceID, - TxHash: jm.txHash, PieceJob: &pieceJob, } if _, err := uploader.stoneHub.DonePrimaryPieceJob(ctx, req); err != nil { diff --git a/store/jobdb/job_db.go b/store/jobdb/job_db.go index 754672637..d8ee8f2d3 100644 --- a/store/jobdb/job_db.go +++ b/store/jobdb/job_db.go @@ -40,11 +40,13 @@ type JobDB interface { SetSecondaryPieceJobDone(txHash []byte, piece *PieceJob) error } +/* Compare to JobDB, JobDBV2 change index from CreateObjectTxHash to ObjectID. + * Adapt for changing light client to heavy client, ObjectID as index is necessary for SP. + */ + // JobDBV2 use objectID as primary key type JobDBV2 interface { CreateUploadPayloadJobV2(info *ptypesv1pb.ObjectInfo) (uint64, error) - // SetObjectCreateHeightAndObjectID maybe useless - // SetObjectCreateHeightAndObjectID(txHash []byte, height uint64, objectID uint64) error GetObjectInfoV2(objectID uint64) (*ptypesv1pb.ObjectInfo, error) GetJobContextV2(jobId uint64) (*ptypesv1pb.JobContext, error) diff --git a/store/jobdb/jobmemory/job_mem_db.go b/store/jobdb/jobmemory/job_mem_db.go index 0c4135d99..92f05a55c 100644 --- a/store/jobdb/jobmemory/job_mem_db.go +++ b/store/jobdb/jobmemory/job_mem_db.go @@ -8,6 +8,8 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" ) +var _ jobdb.JobDB = &MemJobDB{} + // MemJobDB is a memory db, maintains job, object and piece job table. type MemJobDB struct { JobCount uint64 diff --git a/store/jobdb/jobmemory/job_mem_db_v2.go b/store/jobdb/jobmemory/job_mem_db_v2.go new file mode 100644 index 000000000..8209c916e --- /dev/null +++ b/store/jobdb/jobmemory/job_mem_db_v2.go @@ -0,0 +1,158 @@ +package jobmemory + +import ( + "errors" + "sync" + + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" +) + +var _ jobdb.JobDBV2 = &MemJobDBV2{} + +// MemJobDBV2 is a memory db, maintains job, object and piece job table. +type MemJobDBV2 struct { + JobCount uint64 + JobTable map[uint64]ptypesv1pb.JobContext + ObjectTable map[uint64]ptypesv1pb.ObjectInfo + PrimaryPieceJobTable map[uint64]map[uint32]jobdb.PieceJob + SecondaryPieceJobTable map[uint64]map[uint32]jobdb.PieceJob + mu sync.RWMutex +} + +// NewMemJobDBV2 return a MemJobDBV2 instance. +func NewMemJobDBV2() *MemJobDBV2 { + return &MemJobDBV2{ + JobCount: 0, + JobTable: make(map[uint64]ptypesv1pb.JobContext), + ObjectTable: make(map[uint64]ptypesv1pb.ObjectInfo), + PrimaryPieceJobTable: make(map[uint64]map[uint32]jobdb.PieceJob), + SecondaryPieceJobTable: make(map[uint64]map[uint32]jobdb.PieceJob), + } +} + +// CreateUploadPayloadJobV2 create a job info for special object. +func (db *MemJobDBV2) CreateUploadPayloadJobV2(info *ptypesv1pb.ObjectInfo) (uint64, error) { + if info == nil { + return 0, errors.New("object info is nil") + } + db.mu.Lock() + defer db.mu.Unlock() + db.JobTable[db.JobCount] = ptypesv1pb.JobContext{ + JobId: db.JobCount, + JobState: ptypesv1pb.JobState_JOB_STATE_CREATE_OBJECT_DONE, + } + info.JobId = db.JobCount + db.ObjectTable[info.GetObjectId()] = *info + db.JobCount++ + return db.JobCount - 1, nil +} + +// GetJobContextV2 returns the job info . +func (db *MemJobDBV2) GetJobContextV2(jobId uint64) (*ptypesv1pb.JobContext, error) { + db.mu.RLock() + defer db.mu.RUnlock() + job, ok := db.JobTable[jobId] + if !ok { + return nil, errors.New("job is not exist") + } + return &job, nil +} + +// GetObjectInfoV2 returns the object info by object id. +func (db *MemJobDBV2) GetObjectInfoV2(objectID uint64) (*ptypesv1pb.ObjectInfo, error) { + db.mu.RLock() + defer db.mu.RUnlock() + objectInfo, ok := db.ObjectTable[objectID] + if !ok { + return nil, errors.New("object is not exist") + } + return &objectInfo, nil +} + +// SetUploadPayloadJobStateV2 set the job state. +func (db *MemJobDBV2) SetUploadPayloadJobStateV2(jobId uint64, state string, timestamp int64) error { + db.mu.Lock() + defer db.mu.Unlock() + job, ok := db.JobTable[jobId] + if !ok { + return errors.New("job is not exist") + } + jobState, ok := ptypesv1pb.JobState_value[state] + if !ok { + return errors.New("state is not correct job state") + } + job.JobState = (ptypesv1pb.JobState)(jobState) + job.ModifyTime = timestamp + db.JobTable[jobId] = job + return nil +} + +// SetUploadPayloadJobJobErrorV2 set the job error state and error message. +func (db *MemJobDBV2) SetUploadPayloadJobJobErrorV2(jobId uint64, state string, jobErr string, timestamp int64) error { + db.mu.Lock() + defer db.mu.Unlock() + job, ok := db.JobTable[jobId] + if !ok { + return errors.New("job is not exist") + } + jobState, ok := ptypesv1pb.JobState_value[state] + if !ok { + return errors.New("state is not correct job state") + } + job.JobState = (ptypesv1pb.JobState)(jobState) + job.ModifyTime = timestamp + job.JobErr = jobErr + db.JobTable[jobId] = job + return nil +} + +// GetPrimaryJobV2 returns the primary piece jobs by by object id. +func (db *MemJobDBV2) GetPrimaryJobV2(objectID uint64) ([]*jobdb.PieceJob, error) { + db.mu.RLock() + defer db.mu.RUnlock() + if _, ok := db.PrimaryPieceJobTable[objectID]; !ok { + return []*jobdb.PieceJob{}, nil + } + pieces := make([]*jobdb.PieceJob, len(db.PrimaryPieceJobTable[objectID])) + for idx, job := range db.PrimaryPieceJobTable[objectID] { + pieces[idx] = &job + } + return pieces, nil +} + +// GetSecondaryJobV2 returns the secondary piece jobs by object id. +func (db *MemJobDBV2) GetSecondaryJobV2(objectID uint64) ([]*jobdb.PieceJob, error) { + db.mu.RLock() + defer db.mu.RUnlock() + if _, ok := db.SecondaryPieceJobTable[objectID]; !ok { + return []*jobdb.PieceJob{}, nil + } + pieces := make([]*jobdb.PieceJob, len(db.SecondaryPieceJobTable[objectID])) + for idx, job := range db.SecondaryPieceJobTable[objectID] { + pieces[idx] = &job + } + return pieces, nil +} + +// SetPrimaryPieceJobDoneV2 set one primary piece job is completed. +func (db *MemJobDBV2) SetPrimaryPieceJobDoneV2(objectID uint64, piece *jobdb.PieceJob) error { + db.mu.Lock() + defer db.mu.Unlock() + if _, ok := db.PrimaryPieceJobTable[objectID]; !ok { + db.PrimaryPieceJobTable[objectID] = make(map[uint32]jobdb.PieceJob) + } + db.PrimaryPieceJobTable[objectID][piece.PieceId] = *piece + return nil +} + +// SetSecondaryPieceJobDoneV2 set one secondary piece job is completed. +func (db *MemJobDBV2) SetSecondaryPieceJobDoneV2(objectID uint64, piece *jobdb.PieceJob) error { + db.mu.Lock() + defer db.mu.Unlock() + if _, ok := db.SecondaryPieceJobTable[objectID]; !ok { + db.SecondaryPieceJobTable[objectID] = make(map[uint32]jobdb.PieceJob) + } + db.SecondaryPieceJobTable[objectID][piece.PieceId] = *piece + return nil +} diff --git a/util/log/logger.go b/util/log/logger.go index 61cdcc25e..c1299b56e 100644 --- a/util/log/logger.go +++ b/util/log/logger.go @@ -330,13 +330,13 @@ func CtxPanicw(ctx context.Context, msg string, kvs ...interface{}) { func Context(ctx context.Context, opts ...interface{}) context.Context { for _, req := range opts { - if reflect.ValueOf(req).MethodByName("GetTraceId").IsValid() { - valList := reflect.ValueOf(req).MethodByName("GetTraceId").Call([]reflect.Value{}) - if len(valList) > 0 && !valList[0].IsZero() { - traceID := valList[0].String() - ctx = metainfo.WithValue(ctx, "trace_id", traceID) - } - } + //if reflect.ValueOf(req).MethodByName("GetTraceId").IsValid() { + // valList := reflect.ValueOf(req).MethodByName("GetTraceId").Call([]reflect.Value{}) + // if len(valList) > 0 && !valList[0].IsZero() { + // traceID := valList[0].String() + // ctx = metainfo.WithValue(ctx, "trace_id", traceID) + // } + //} if reflect.ValueOf(req).MethodByName("GetObjectId").IsValid() { valList := reflect.ValueOf(req).MethodByName("GetObjectId").Call([]reflect.Value{}) if len(valList) > 0 && !valList[0].IsZero() {