From 3edefce3786f6f3c50d37c04028f25a089e19d14 Mon Sep 17 00:00:00 2001 From: will-2012 Date: Fri, 3 Feb 2023 14:15:28 +0800 Subject: [PATCH] refactor(jobdb): add jobdb v2 interface, objectID as primary key --- store/jobdb/job_db.go | 19 ++ store/jobdb/jobsql/db_schema.go | 77 +++++++- store/jobdb/jobsql/job_meta.go | 282 ++++++++++++++++++++++++++++ store/jobdb/jobsql/job_meta_test.go | 104 +++++++++- 4 files changed, 473 insertions(+), 9 deletions(-) diff --git a/store/jobdb/job_db.go b/store/jobdb/job_db.go index f54779bb3..754672637 100644 --- a/store/jobdb/job_db.go +++ b/store/jobdb/job_db.go @@ -23,6 +23,7 @@ type PieceJob struct { Done bool } +// JobDB use txhash as primary key type JobDB interface { CreateUploadPayloadJob(txHash []byte, info *ptypesv1pb.ObjectInfo) (uint64, error) SetObjectCreateHeightAndObjectID(txHash []byte, height uint64, objectID uint64) error @@ -38,3 +39,21 @@ type JobDB interface { SetPrimaryPieceJobDone(txHash []byte, piece *PieceJob) error SetSecondaryPieceJobDone(txHash []byte, piece *PieceJob) error } + +// JobDBV2 use objectID as primary key +type JobDBV2 interface { + CreateUploadPayloadJobV2(info *ptypesv1pb.ObjectInfo) (uint64, error) + // SetObjectCreateHeightAndObjectID maybe useless + // SetObjectCreateHeightAndObjectID(txHash []byte, height uint64, objectID uint64) error + + GetObjectInfoV2(objectID uint64) (*ptypesv1pb.ObjectInfo, error) + GetJobContextV2(jobId uint64) (*ptypesv1pb.JobContext, error) + + SetUploadPayloadJobStateV2(jobId uint64, state string, timestamp int64) error + SetUploadPayloadJobJobErrorV2(jobID uint64, jobState string, jobErr string, timestamp int64) error + + GetPrimaryJobV2(objectID uint64) ([]*PieceJob, error) + GetSecondaryJobV2(objectID uint64) ([]*PieceJob, error) + SetPrimaryPieceJobDoneV2(objectID uint64, piece *PieceJob) error + SetSecondaryPieceJobDoneV2(objectID uint64, piece *PieceJob) error +} diff --git a/store/jobdb/jobsql/db_schema.go b/store/jobdb/jobsql/db_schema.go index 9e8428db2..4788c5ba3 100644 --- a/store/jobdb/jobsql/db_schema.go +++ b/store/jobdb/jobsql/db_schema.go @@ -10,6 +10,8 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/util/log" ) +// v1 schema + // DBJob table schema type DBJob struct { JobID uint64 `gorm:"primary_key;autoIncrement"` @@ -65,6 +67,64 @@ func (DBPieceJob) TableName() string { return "piece_job" } +// v2 schema + +// DBJobV2 table schema +type DBJobV2 struct { + JobID uint64 `gorm:"primary_key;autoIncrement"` + JobType uint32 + JobState uint32 + JobErr string + CreateTime time.Time + ModifyTime time.Time +} + +// TableName is used to set Job Schema's table name in database +func (DBJobV2) TableName() string { + return "job_v2" +} + +// DBObjectV2 table schema +type DBObjectV2 struct { + ObjectID uint64 `gorm:"primary_key"` + JobID uint64 // Job.JobID + CreateHash string + SealHash string + Owner string + BucketName string + ObjectName string + Size uint64 + Checksum string + IsPrivate bool + ContentType string + PrimarySP string + Height uint64 + RedundancyType uint32 +} + +// TableName is used to set Object Schema's table name in database +func (DBObjectV2) TableName() string { + return "object_v2" +} + +// DBPieceJobV2 table schema +type DBPieceJobV2 struct { + ObjectID uint64 `gorm:"index:idx_piece_group"` + PieceType uint32 `gorm:"index:idx_piece_group"` + PieceIdx uint32 + PieceState uint32 + Checksum string + StorageProvider string + IntegrityHash string + Signature string +} + +// TableName is used to set PieceJob Schema's table name in database +func (DBPieceJobV2) TableName() string { + return "piece_job_v2" +} + +// DBOption is mysql config options type DBOption struct { User string Passwd string @@ -77,7 +137,7 @@ var DefaultDBOption = &DBOption{ User: "root", Passwd: "test_pwd", Address: "127.0.0.1:3306", - Database: "job_context", + Database: "job_db", } func InitDB(opt *DBOption) (*gorm.DB, error) { @@ -93,6 +153,7 @@ func InitDB(opt *DBOption) (*gorm.DB, error) { } // create if not exist + // v1 table if err := db.AutoMigrate(&DBJob{}); err != nil { log.Warnw("failed to create job table", "err", err) return nil, err @@ -105,5 +166,19 @@ func InitDB(opt *DBOption) (*gorm.DB, error) { log.Warnw("failed to create piece job table", "err", err) return nil, err } + // v2 table + if err := db.AutoMigrate(&DBJobV2{}); err != nil { + log.Warnw("failed to create job table v2", "err", err) + return nil, err + } + if err := db.AutoMigrate(&DBObjectV2{}); err != nil { + log.Warnw("failed to create object table v2", "err", err) + return nil, err + } + if err := db.AutoMigrate(&DBPieceJobV2{}); err != nil { + log.Warnw("failed to create piece job table v2", "err", err) + return nil, err + } + return db, nil } diff --git a/store/jobdb/jobsql/job_meta.go b/store/jobdb/jobsql/job_meta.go index 63aaf8406..f3bf2f793 100644 --- a/store/jobdb/jobsql/job_meta.go +++ b/store/jobdb/jobsql/job_meta.go @@ -18,6 +18,7 @@ type JobMetaImpl struct { db *gorm.DB } +// NewJobMetaImpl return a database instance func NewJobMetaImpl(option *DBOption) (*JobMetaImpl, error) { db, err := InitDB(option) if err != nil { @@ -26,6 +27,8 @@ func NewJobMetaImpl(option *DBOption) (*JobMetaImpl, error) { return &JobMetaImpl{db: db}, nil } +// v1 interface implement + // CreateUploadPayloadJob create DBJob record and DBObject record, Use JobID field for association. func (jmi *JobMetaImpl) CreateUploadPayloadJob(txHash []byte, info *ptypesv1pb.ObjectInfo) (uint64, error) { var ( @@ -340,3 +343,282 @@ func (jmi *JobMetaImpl) SetObjectCreateHeightAndObjectID(txHash []byte, height u } return nil } + +// v2 interface implement + +// CreateUploadPayloadJobV2 create DBJobV2 record and DBObjectV2 record, Use JobID field for association. +func (jmi *JobMetaImpl) CreateUploadPayloadJobV2(info *ptypesv1pb.ObjectInfo) (uint64, error) { + var ( + result *gorm.DB + insertJobRecord *DBJobV2 + insertObjectRecord *DBObjectV2 + ) + + insertJobRecord = &DBJobV2{ + JobType: uint32(ptypesv1pb.JobType_JOB_TYPE_CREATE_OBJECT), + JobState: uint32(ptypesv1pb.JobState_JOB_STATE_CREATE_OBJECT_DONE), + CreateTime: time.Now(), + ModifyTime: time.Now(), + } + result = jmi.db.Create(insertJobRecord) + if result.Error != nil || result.RowsAffected != 1 { + return 0, fmt.Errorf("insert job record failed, %s", result.Error) + } + insertObjectRecord = &DBObjectV2{ + ObjectID: info.ObjectId, + JobID: insertJobRecord.JobID, + CreateHash: hex.EncodeToString(info.TxHash), + Height: info.Height, + Owner: info.Owner, + BucketName: info.BucketName, + ObjectName: info.ObjectName, + Size: info.Size, + IsPrivate: info.IsPrivate, + ContentType: info.ContentType, + PrimarySP: "mock-sp-string", // todo: how to encode sp info? + RedundancyType: uint32(info.RedundancyType), + } + result = jmi.db.Create(insertObjectRecord) + if result.Error != nil || result.RowsAffected != 1 { + return 0, fmt.Errorf("insert object record failed, %s", result.Error) + } + return insertJobRecord.JobID, nil +} + +// GetObjectInfoV2 query DBObjectV2 by txHash, and convert to types.ObjectInfo. +func (jmi *JobMetaImpl) GetObjectInfoV2(objectID uint64) (*ptypesv1pb.ObjectInfo, error) { + var ( + result *gorm.DB + queryCondition *DBObjectV2 + queryReturn DBObjectV2 + ) + + // If the primary key is a number, the query will be written as follows: + queryCondition = &DBObjectV2{ + ObjectID: objectID, + } + result = jmi.db.Model(queryCondition).First(&queryReturn) + if result.Error != nil { + return nil, fmt.Errorf("select job record's failed, %s", result.Error) + } + txHash, err := hex.DecodeString(queryReturn.CreateHash) + if err != nil { + return nil, err + } + return &ptypesv1pb.ObjectInfo{ + JobId: queryReturn.JobID, + Owner: queryReturn.Owner, + BucketName: queryReturn.BucketName, + ObjectName: queryReturn.ObjectName, + Size: queryReturn.Size, + Checksum: []byte(queryReturn.Checksum), + IsPrivate: queryReturn.IsPrivate, + ContentType: queryReturn.ContentType, + PrimarySp: nil, // todo: how to decode sp info + Height: queryReturn.Height, + TxHash: txHash, + RedundancyType: ptypesv1pb.RedundancyType(queryReturn.RedundancyType), + SecondarySps: nil, // todo: how to fill + }, nil +} + +// GetJobContextV2 query DBJobV2 by jobID, and convert to types.JobContext. +func (jmi *JobMetaImpl) GetJobContextV2(jobId uint64) (*ptypesv1pb.JobContext, error) { + var ( + result *gorm.DB + queryCondition *DBJobV2 + queryReturn DBJobV2 + ) + + // If the primary key is a number, the query will be written as follows: + queryCondition = &DBJobV2{ + JobID: jobId, + } + result = jmi.db.Model(queryCondition).First(&queryReturn) + if result.Error != nil { + return nil, fmt.Errorf("select job record's failed, %s", result.Error) + } + return &ptypesv1pb.JobContext{ + JobId: queryReturn.JobID, + JobType: ptypesv1pb.JobType(queryReturn.JobType), + JobState: ptypesv1pb.JobState(queryReturn.JobState), + JobErr: queryReturn.JobErr, + CreateTime: queryReturn.CreateTime.Unix(), + ModifyTime: queryReturn.ModifyTime.Unix(), + }, nil +} + +// SetUploadPayloadJobStateV2 update DBJobV2 record's state. +func (jmi *JobMetaImpl) SetUploadPayloadJobStateV2(jobId uint64, jobState string, timestampSec int64) error { + var ( + result *gorm.DB + queryCondition *DBJobV2 + updateFields *DBJobV2 + ) + + queryCondition = &DBJobV2{ + JobID: jobId, + } + updateFields = &DBJobV2{ + JobState: uint32(ptypesv1pb.JobState_value[jobState]), + ModifyTime: time.Unix(timestampSec, 0), + } + result = jmi.db.Model(queryCondition).Updates(updateFields) + if result.Error != nil || result.RowsAffected != 1 { + return fmt.Errorf("update job record's state failed, %s", result.Error) + } + return nil +} + +// SetUploadPayloadJobJobErrorV2 update DBJobV2 record's state and err. +func (jmi *JobMetaImpl) SetUploadPayloadJobJobErrorV2(jobID uint64, jobState string, jobErr string, timestampSec int64) error { + var ( + result *gorm.DB + queryCondition *DBJobV2 + updateFields *DBJobV2 + ) + + queryCondition = &DBJobV2{ + JobID: jobID, + } + updateFields = &DBJobV2{ + JobState: uint32(ptypesv1pb.JobState_value[jobState]), + ModifyTime: time.Unix(timestampSec, 0), + JobErr: jobErr, + } + result = jmi.db.Model(queryCondition).Updates(updateFields) + if result.Error != nil || result.RowsAffected != 1 { + return fmt.Errorf("update job record's state and err failed, %s", result.Error) + } + return nil +} + +// GetPrimaryJobV2 query DBPieceJobV2 by objectID and primary type, and convert to PieceJob. +func (jmi *JobMetaImpl) GetPrimaryJobV2(objectId uint64) ([]*jobdb.PieceJob, error) { + var ( + result *gorm.DB + queryReturns []DBPieceJobV2 + pieceJobs []*jobdb.PieceJob + ) + + result = jmi.db. + Where("object_id = ? AND piece_type = ?", objectId, ptypesv1pb.JobType_JOB_TYPE_UPLOAD_PRIMARY). + Find(&queryReturns) + if result.Error != nil { + return pieceJobs, fmt.Errorf("select primary piece jobs failed, %s", result.Error) + } + for _, job := range queryReturns { + pieceJobs = append(pieceJobs, &jobdb.PieceJob{ + PieceId: job.PieceIdx, + Checksum: [][]byte{[]byte(job.IntegrityHash)}, + StorageProvider: job.StorageProvider}) + } + return pieceJobs, nil +} + +// SetPrimaryPieceJobDoneV2 create primary DBPieceJobV2 record. +func (jmi *JobMetaImpl) SetPrimaryPieceJobDoneV2(objectID uint64, pj *jobdb.PieceJob) error { + var ( + result *gorm.DB + insertPieceJobRecord *DBPieceJobV2 + ) + + insertPieceJobRecord = &DBPieceJobV2{ + ObjectID: objectID, + PieceType: uint32(ptypesv1pb.JobType_JOB_TYPE_UPLOAD_PRIMARY), + PieceIdx: pj.PieceId, + Checksum: string(pj.Checksum[0]), + PieceState: 0, // todo: fill what? + StorageProvider: pj.StorageProvider, + IntegrityHash: "", + Signature: "", + } + result = jmi.db.Create(insertPieceJobRecord) + if result.Error != nil || result.RowsAffected != 1 { + return fmt.Errorf("insert primary piece job record failed, %s", result.Error) + } + return nil +} + +// SetSecondaryPieceJobDoneV2 create secondary DBPieceJobV2 record. +func (jmi *JobMetaImpl) SetSecondaryPieceJobDoneV2(objectID uint64, pj *jobdb.PieceJob) error { + var ( + result *gorm.DB + insertPieceJobRecord *DBPieceJobV2 + ) + + insertPieceJobRecord = &DBPieceJobV2{ + ObjectID: objectID, + PieceType: uint32(ptypesv1pb.JobType_JOB_TYPE_UPLOAD_SECONDARY_EC), + PieceIdx: pj.PieceId, + Checksum: string(pj.Checksum[0]), + PieceState: 0, // todo: fill what? + StorageProvider: pj.StorageProvider, + IntegrityHash: "", + Signature: "", + } + result = jmi.db.Create(insertPieceJobRecord) + if result.Error != nil || result.RowsAffected != 1 { + return fmt.Errorf("insert secondary piece job record failed, %s", result.Error) + } + return nil +} + +// GetSecondaryJobV2 query DBPieceJobV2 by objectID and secondary type, and convert to PieceJob. +func (jmi *JobMetaImpl) GetSecondaryJobV2(objectID uint64) ([]*jobdb.PieceJob, error) { + var ( + result *gorm.DB + queryReturns []DBPieceJobV2 + pieceJobs []*jobdb.PieceJob + ) + + result = jmi.db. + Where("object_id = ? AND piece_type = ?", objectID, ptypesv1pb.JobType_JOB_TYPE_UPLOAD_SECONDARY_EC). + Find(&queryReturns) + if result.Error != nil { + return pieceJobs, fmt.Errorf("select secondary piece jobs failed, %s", result.Error) + } + for _, job := range queryReturns { + pieceJobs = append(pieceJobs, &jobdb.PieceJob{ + PieceId: job.PieceIdx, + Checksum: [][]byte{[]byte(job.IntegrityHash)}, + StorageProvider: job.StorageProvider}) + } + return pieceJobs, nil +} + +// ScanObjectInfoV2 query scan DBObjectV2, and convert to ObjectInfo. +func (jmi *JobMetaImpl) ScanObjectInfoV2(offset int, limit int) ([]*ptypesv1pb.ObjectInfo, error) { + var ( + result *gorm.DB + queryReturns []DBObjectV2 + objects []*ptypesv1pb.ObjectInfo + ) + + result = jmi.db.Limit(limit).Offset(offset).Find(&queryReturns) + if result.Error != nil { + return objects, fmt.Errorf("select primary piece jobs failed, %s", result.Error) + } + for _, object := range queryReturns { + txHash, err := hex.DecodeString(object.CreateHash) + if err != nil { + return objects, err + } + objects = append(objects, &ptypesv1pb.ObjectInfo{ + JobId: object.JobID, + Owner: object.Owner, + BucketName: object.BucketName, + ObjectName: object.ObjectName, + Size: object.Size, + Checksum: []byte(object.Checksum), + IsPrivate: object.IsPrivate, + ContentType: object.ContentType, + PrimarySp: nil, // todo: how to decode sp info + Height: object.Height, + TxHash: txHash, + RedundancyType: ptypesv1pb.RedundancyType(object.RedundancyType), + SecondarySps: nil, // todo: how to fill + }) + } + return objects, nil +} diff --git a/store/jobdb/jobsql/job_meta_test.go b/store/jobdb/jobsql/job_meta_test.go index b71d6f100..59f6813ea 100644 --- a/store/jobdb/jobsql/job_meta_test.go +++ b/store/jobdb/jobsql/job_meta_test.go @@ -2,7 +2,6 @@ package jobsql import ( "encoding/hex" - "fmt" "testing" "time" @@ -16,7 +15,7 @@ import ( func TestJobMeta(t *testing.T) { var ( txHash = []byte("testHash-" + string(time.Now().Format("2006-01-02 15:04:05"))) - jobId uint64 + jobID uint64 ) // case1 CreateUploadPayloadJob { @@ -26,7 +25,6 @@ func TestJobMeta(t *testing.T) { &ptypesv1pb.ObjectInfo{BucketName: "testBucket", ObjectName: "testObject"}) assert.Equal(t, nil, err) } - fmt.Println(string(txHash)) // case2 SetObjectCreateHeight/SetObjectCreateHeightAndObjectID { jmi, _ := NewJobMetaImpl(DefaultDBOption) @@ -40,7 +38,7 @@ func TestJobMeta(t *testing.T) { jmi, _ := NewJobMetaImpl(DefaultDBOption) info, err := jmi.GetObjectInfo(txHash) assert.Equal(t, nil, err) - jobId = info.JobId + jobID = info.JobId } // case4 ScanObjectInfo { @@ -52,22 +50,22 @@ func TestJobMeta(t *testing.T) { // case4 GetJobContext { jmi, _ := NewJobMetaImpl(DefaultDBOption) - _, err := jmi.GetJobContext(jobId) + _, err := jmi.GetJobContext(jobID) assert.Equal(t, nil, err) } // case5 SetUploadPayloadJobState { jmi, _ := NewJobMetaImpl(DefaultDBOption) - err := jmi.SetUploadPayloadJobState(jobId, "JOB_STATE_DONE", time.Now().Unix()) + err := jmi.SetUploadPayloadJobState(jobID, "JOB_STATE_DONE", time.Now().Unix()) assert.Equal(t, nil, err) } // case6 SetUploadPayloadJobJobError { jmi, _ := NewJobMetaImpl(DefaultDBOption) - err := jmi.SetUploadPayloadJobJobError(jobId, "JOB_STATE_ERROR", "job-err-msg", time.Now().Unix()) + err := jmi.SetUploadPayloadJobJobError(jobID, "JOB_STATE_ERROR", "job-err-msg", time.Now().Unix()) assert.Equal(t, nil, err) } - // clear piecejob table + // clear piece_job table { jmi, _ := NewJobMetaImpl(DefaultDBOption) key1 := hex.EncodeToString([]byte("123")) @@ -115,3 +113,93 @@ func TestJobMeta(t *testing.T) { assert.Equal(t, 2, len(pieces)) } } + +func TestJobMetaV2(t *testing.T) { + var ( + objectID = uint64(time.Now().Unix()) + jobID uint64 + ) + // case1 CreateUploadPayloadJobV2 + { + jmi, _ := NewJobMetaImpl(DefaultDBOption) + _, err := jmi.CreateUploadPayloadJobV2( + &ptypesv1pb.ObjectInfo{BucketName: "testBucket", ObjectName: "testObject", ObjectId: objectID}) + assert.Equal(t, nil, err) + } + // case2 GetObjectInfoV2 + { + jmi, _ := NewJobMetaImpl(DefaultDBOption) + info, err := jmi.GetObjectInfoV2(objectID) + assert.Equal(t, nil, err) + jobID = info.JobId + } + // case3 ScanObjectInfoV2 + { + jmi, _ := NewJobMetaImpl(DefaultDBOption) + objects, err := jmi.ScanObjectInfoV2(0, 10) + assert.Equal(t, nil, err) + assert.True(t, len(objects) >= 1) + } + // case4 GetJobContextV2 + { + jmi, _ := NewJobMetaImpl(DefaultDBOption) + _, err := jmi.GetJobContextV2(jobID) + assert.Equal(t, nil, err) + } + // case5 SetUploadPayloadJobStateV2 + { + jmi, _ := NewJobMetaImpl(DefaultDBOption) + err := jmi.SetUploadPayloadJobState(jobID, "JOB_STATE_DONE", time.Now().Unix()) + assert.Equal(t, nil, err) + } + // case6 SetUploadPayloadJobJobErrorV2 + { + jmi, _ := NewJobMetaImpl(DefaultDBOption) + err := jmi.SetUploadPayloadJobJobError(jobID, "JOB_STATE_ERROR", "job-err-msg", time.Now().Unix()) + assert.Equal(t, nil, err) + } + // clear piece_job_v2 table + { + jmi, _ := NewJobMetaImpl(DefaultDBOption) + jmi.db.Exec("DELETE FROM piece_job_v2 where object_id=?", 123) + jmi.db.Exec("DELETE FROM piece_job_v2 where object_id=?", 456) + } + // case7 SetPrimaryPieceJobDoneV2/GetPrimaryJobV2 + { + jmi, _ := NewJobMetaImpl(DefaultDBOption) + err := jmi.SetPrimaryPieceJobDoneV2(123, &jobdb.PieceJob{ + PieceId: 0, + Checksum: [][]byte{[]byte("123-0-sum")}, + StorageProvider: "123-0-sp", + }) + assert.Equal(t, nil, err) + err = jmi.SetPrimaryPieceJobDoneV2(123, &jobdb.PieceJob{ + PieceId: 1, + Checksum: [][]byte{[]byte("123-1-sum")}, + StorageProvider: "123-1-sp", + }) + assert.Equal(t, nil, err) + pieces, err := jmi.GetPrimaryJobV2(123) + assert.Equal(t, nil, err) + assert.Equal(t, 2, len(pieces)) + } + // case8 SetSecondaryPieceJobDoneV2/GetSecondaryJobV2 + { + jmi, _ := NewJobMetaImpl(DefaultDBOption) + err := jmi.SetSecondaryPieceJobDoneV2(456, &jobdb.PieceJob{ + PieceId: 0, + Checksum: [][]byte{[]byte("456-0-sum")}, + StorageProvider: "456-0-sp", + }) + assert.Equal(t, nil, err) + err = jmi.SetSecondaryPieceJobDoneV2(456, &jobdb.PieceJob{ + PieceId: 1, + Checksum: [][]byte{[]byte("456-1-sum")}, + StorageProvider: "456-1-sp", + }) + assert.Equal(t, nil, err) + pieces, err := jmi.GetSecondaryJobV2(456) + assert.Equal(t, nil, err) + assert.Equal(t, 2, len(pieces)) + } +}