Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: uploader panic under db access error #278

Merged
merged 4 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions model/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ const (
P2PPrivateKey = "P2P_PRIVATE_KEY"
)

// SQLDB default configuration parmas
const (
// DefaultConnMaxLifetime defines the default max liveness time of connection
DefaultConnMaxLifetime = 3600
// DefaultConnMaxIdleTime defines the default max idle time of connection
DefaultConnMaxIdleTime = 60
// DefaultMaxIdleConns defines the default max number of idle connections
DefaultMaxIdleConns = 128
// DefaultMaxOpenConns defines the default max number of open connections
DefaultMaxOpenConns = 1024
)

// define all kinds of http constants
const (
// ContentTypeHeader is used to indicate the media type of the resource
Expand Down
10 changes: 10 additions & 0 deletions model/errors/rpc_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var (
ErrUnsupportedMethod = errors.New("unsupported method")
// ErrIntegerOverflow defines integer overflow
ErrIntegerOverflow = errors.New("integer overflow")
// ErrDanglingPointer defines the nil pointer error
ErrDanglingPointer = errors.New("pointer dangling")
)

// piece store errors
Expand Down Expand Up @@ -102,6 +104,14 @@ var (
ErrSPNumber = errors.New("failed to get sufficient SPs from DB")
)

// uploader service error
var (
// ErrMismatchIntegrityHash defines integrity hash mismatch error
ErrMismatchIntegrityHash = errors.New("integrity hash mismatch")
// ErrMismatchChecksumNum defines checksum number mismatch error
ErrMismatchChecksumNum = errors.New("checksum number mismatch")
)

// InnerErrorToGRPCError convents inner error to grpc/status error
func InnerErrorToGRPCError(err error) error {
if errors.Is(err, gorm.ErrRecordNotFound) ||
Expand Down
127 changes: 73 additions & 54 deletions service/uploader/uploader_service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uploader

import (
"bytes"
"context"
"io"
"math"
Expand All @@ -21,89 +22,81 @@ var _ types.UploaderServiceServer = &Uploader{}
// PutObject upload an object payload data with object info.
func (uploader *Uploader) PutObject(stream types.UploaderService_PutObjectServer) (err error) {
var (
resp types.PutObjectResponse
pstream = payloadstream.NewAsyncPayloadStream()
traceInfo = &servicetypes.SegmentInfo{}
checksum [][]byte
integrityMeta = &sqldb.IntegrityMeta{}
errCh = make(chan error, 10)
init bool
checksum [][]byte
objectID uint64
objectInfo *storagetypes.ObjectInfo
params *storagetypes.Params
req *types.PutObjectRequest
resp = &types.PutObjectResponse{}
pstream = payloadstream.NewAsyncPayloadStream()
ctx, cancel = context.WithCancel(context.Background())
errCh = make(chan error)
)
defer func(resp *types.PutObjectResponse, err error) {
defer func() {
defer cancel()
if err != nil {
log.Errorw("failed to replicate payload", "err", err)
uploader.spDB.UpdateJobState(traceInfo.GetObjectInfo().Id.Uint64(),
servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_ERROR)
return
}
integrityHash, signature, err := uploader.signer.SignIntegrityHash(context.Background(),
integrityMeta.ObjectID, checksum)
if err != nil {
log.Errorw("failed to sign integrity hash", "err", err)
uploader.spDB.UpdateJobState(traceInfo.GetObjectInfo().Id.Uint64(),
servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_ERROR)
if init {
uploader.spDB.UpdateJobState(objectID, servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_ERROR)
}
log.CtxErrorw(ctx, "failed to replicate payload", "error", err)
return
}
integrityMeta.Checksum = checksum
integrityMeta.IntegrityHash = integrityHash
integrityMeta.Signature = signature
err = uploader.spDB.SetObjectIntegrity(integrityMeta)
uploader.spDB.UpdateJobState(objectID, servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_DONE)
err = uploader.signIntegrityHash(ctx, objectID, objectInfo.GetChecksums()[0], checksum)
if err != nil {
log.Errorw("failed to write integrity hash to db", "error", err)
uploader.spDB.UpdateJobState(traceInfo.GetObjectInfo().Id.Uint64(),
servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_ERROR)
return
}
traceInfo.IntegrityHash = integrityHash
traceInfo.Signature = signature
uploader.cache.Add(traceInfo.ObjectInfo.Id.Uint64(), traceInfo)
err = uploader.taskNode.ReplicateObject(context.Background(), traceInfo.GetObjectInfo())
err = uploader.taskNode.ReplicateObject(ctx, objectInfo)
if err != nil {
log.Errorw("failed to notify task node to replicate object", "error", err)
uploader.spDB.UpdateJobState(traceInfo.GetObjectInfo().Id.Uint64(),
servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_ERROR)
log.CtxErrorw(ctx, "failed to notify task node to replicate object", "error", err)
uploader.spDB.UpdateJobState(objectID, servicetypes.JobState_JOB_STATE_REPLICATE_OBJECT_ERROR)
return
}
err = stream.SendAndClose(resp)
pstream.Close()
uploader.spDB.UpdateJobState(traceInfo.GetObjectInfo().Id.Uint64(),
servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_DONE)
log.Infow("succeed to put object", "error", err)
}(&resp, err)
params, err := uploader.spDB.GetStorageParams()
log.CtxInfow(ctx, "succeed to put object", "error", err)
}()
params, err = uploader.spDB.GetStorageParams()
if err != nil {
return
}
segmentPieceSize := params.GetMaxSegmentSize()

// read payload from gRPC stream
go func() {
init := true
for {
req, err := stream.Recv()
req, err = stream.Recv()
if err == io.EOF {
pstream.StreamClose()
return
}
if err != nil {
log.Debugw("receive payload exception", "error", err)
log.CtxErrorw(ctx, "receive payload exception", "error", err)
pstream.StreamCloseWithError(err)
errCh <- err
return
}
if init {
if !init {
if req.GetObjectInfo() == nil {
errCh <- merrors.ErrDanglingPointer
return
}
objectInfo = req.GetObjectInfo()
if int(params.GetRedundantDataChunkNum()+params.GetRedundantParityChunkNum()+1) !=
len(objectInfo.GetChecksums()) {
errCh <- merrors.ErrMismatchChecksumNum
}
objectID = req.GetObjectInfo().Id.Uint64()
ctx = log.WithValue(ctx, "object_id", req.GetObjectInfo().Id.String())
pstream.InitAsyncPayloadStream(
req.GetObjectInfo().Id.Uint64(),
objectID,
storagetypes.REDUNDANCY_REPLICA_TYPE,
segmentPieceSize,
math.MaxUint32, /*useless*/
)
integrityMeta.ObjectID = req.GetObjectInfo().Id.Uint64()
traceInfo.ObjectInfo = req.GetObjectInfo()
uploader.cache.Add(req.GetObjectInfo().Id.Uint64(), traceInfo)
uploader.spDB.CreateUploadJob(req.GetObjectInfo())
uploader.spDB.UpdateJobState(traceInfo.GetObjectInfo().Id.Uint64(),
servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_DOING)
init = false
uploader.spDB.CreateUploadJob(objectInfo)
uploader.spDB.UpdateJobState(objectID, servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_DOING)
init = true
}
pstream.StreamWrite(req.GetPayload())
}
Expand All @@ -116,16 +109,13 @@ func (uploader *Uploader) PutObject(stream types.UploaderService_PutObjectServer
if !ok { // has finished
return
}
log.Debugw("get piece entry from stream", "piece_key", entry.PieceKey(),
log.CtxDebugw(ctx, "get piece entry from stream", "piece_key", entry.PieceKey(),
"piece_len", len(entry.Data()), "error", entry.Error())
if entry.Error() != nil {
err = entry.Error()
return
}
checksum = append(checksum, hash.GenerateChecksum(entry.Data()))
traceInfo.Checksum = checksum
traceInfo.Completed++
uploader.cache.Add(entry.ObjectID(), traceInfo)
if err = uploader.pieceStore.PutPiece(entry.PieceKey(), entry.Data()); err != nil {
return
}
Expand All @@ -135,6 +125,35 @@ func (uploader *Uploader) PutObject(stream types.UploaderService_PutObjectServer
}
}

func (uploader *Uploader) signIntegrityHash(ctx context.Context, objectID uint64, rootHash []byte, checksum [][]byte) (err error) {
var (
integrityMeta = &sqldb.IntegrityMeta{ObjectID: objectID, Checksum: checksum}
)
uploader.spDB.UpdateJobState(objectID, servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_DOING)
defer func() {
if err != nil {
log.CtxErrorw(ctx, "failed to sign the integrity hash", "error", err)
uploader.spDB.UpdateJobState(objectID, servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_ERROR)
return
}
uploader.spDB.UpdateJobState(objectID, servicetypes.JobState_JOB_STATE_UPLOAD_OBJECT_DONE)
log.CtxInfow(ctx, "succeed to sign the integrity hash", "error", err)
}()
integrityMeta.IntegrityHash, integrityMeta.Signature, err = uploader.signer.SignIntegrityHash(ctx, objectID, checksum)
if err != nil {
return
}
if !bytes.Equal(rootHash, integrityMeta.IntegrityHash) {
err = merrors.ErrMismatchIntegrityHash
return
}
err = uploader.spDB.SetObjectIntegrity(integrityMeta)
if err != nil {
return
}
return
}

// QueryPuttingObject query an uploading object with object id from cache
func (uploader *Uploader) QueryPuttingObject(ctx context.Context, req *types.QueryPuttingObjectRequest) (
resp *types.QueryPuttingObjectResponse, err error) {
Expand Down
12 changes: 8 additions & 4 deletions store/config/db_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package config

// SQLDBConfig is sql db config
type SQLDBConfig struct {
User string
Passwd string
Address string
Database string
User string
Passwd string
Address string
Database string
ConnMaxLifetime int
ConnMaxIdleTime int
MaxIdleConns int
MaxOpenConns int
}
30 changes: 28 additions & 2 deletions store/sqldb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package sqldb
import (
"fmt"
"os"
"time"

"github.com/bnb-chain/greenfield-storage-provider/model"
"gorm.io/driver/mysql"
"gorm.io/gorm"

"github.com/bnb-chain/greenfield-storage-provider/model"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
"github.com/bnb-chain/greenfield-storage-provider/store/config"
)
Expand All @@ -22,6 +23,7 @@ type SpDBImpl struct {
// NewSpDB return a database instance
func NewSpDB(config *config.SQLDBConfig) (*SpDBImpl, error) {
LoadDBConfigFromEnv(config)
OverrideConfigVacancy(config)
db, err := InitDB(config)
if err != nil {
return nil, err
Expand All @@ -38,7 +40,15 @@ func InitDB(config *config.SQLDBConfig) (*gorm.DB, error) {
log.Errorw("gorm failed to open db", "error", err)
return nil, err
}

sqlDB, err := db.DB()
if err != nil {
log.Errorw("gorm failed to set db params", "error", err)
return nil, err
}
sqlDB.SetConnMaxLifetime(time.Duration(config.ConnMaxLifetime))
sqlDB.SetConnMaxIdleTime(time.Duration(config.ConnMaxIdleTime))
sqlDB.SetMaxIdleConns(config.MaxIdleConns)
sqlDB.SetMaxOpenConns(config.MaxOpenConns)
// create if not exist
if err := db.AutoMigrate(&JobTable{}); err != nil {
log.Errorw("failed to create job table", "error", err)
Expand Down Expand Up @@ -90,3 +100,19 @@ func LoadDBConfigFromEnv(config *config.SQLDBConfig) {
config.Database = val
}
}

// OverrideConfigVacancy override the SQLDB param zero value
func OverrideConfigVacancy(config *config.SQLDBConfig) {
if config.ConnMaxLifetime == 0 {
config.ConnMaxLifetime = model.DefaultConnMaxLifetime
}
if config.ConnMaxIdleTime == 0 {
config.ConnMaxIdleTime = model.DefaultConnMaxIdleTime
}
if config.MaxIdleConns == 0 {
config.MaxIdleConns = model.DefaultMaxIdleConns
}
if config.MaxOpenConns == 0 {
config.MaxOpenConns = model.DefaultMaxOpenConns
}
}