diff --git a/config/subconfig.go b/config/subconfig.go index 6790a8d0a..a3b835196 100644 --- a/config/subconfig.go +++ b/config/subconfig.go @@ -238,6 +238,12 @@ func (cfg *StorageProviderConfig) MakeManagerServiceConfig() (*manager.ManagerCo SpOperatorAddress: cfg.SpOperatorAddress, ChainConfig: cfg.ChainConfig, SpDBConfig: cfg.SpDBConfig, + PieceStoreConfig: cfg.PieceStoreConfig, + } + if _, ok := cfg.Endpoint[model.MetadataService]; ok { + managerConfig.MetadataGrpcAddress = cfg.Endpoint[model.MetadataService] + } else { + return nil, fmt.Errorf("missing metadata server gRPC address configuration for manager service") } return managerConfig, nil } diff --git a/model/piecestore/piece_key.go b/model/piecestore/piece_key.go index d14642a19..5bb108981 100644 --- a/model/piecestore/piece_key.go +++ b/model/piecestore/piece_key.go @@ -95,3 +95,35 @@ func ComputeSegmentCount(size uint64, spiltSize uint64) uint32 { } return segmentCount } + +// GenerateObjectSegmentKeyList generate object's segment piece key list. +func GenerateObjectSegmentKeyList(objectID, objectSize, segmentSize uint64) []string { + var ( + segmentIndex uint32 + segmentCount uint32 + pieceKeyList = make([]string, 0) + ) + + segmentCount = ComputeSegmentCount(objectSize, segmentSize) + for segmentIndex < segmentCount { + pieceKeyList = append(pieceKeyList, EncodeSegmentPieceKey(objectID, segmentIndex)) + segmentIndex++ + } + return pieceKeyList +} + +// GenerateObjectECKeyList generate object's ec piece key list. +func GenerateObjectECKeyList(objectID, objectSize, segmentSize, redundancyIndex uint64) []string { + var ( + segmentIndex uint32 + segmentCount uint32 + pieceKeyList = make([]string, 0) + ) + + segmentCount = ComputeSegmentCount(objectSize, segmentSize) + for segmentIndex < segmentCount { + pieceKeyList = append(pieceKeyList, EncodeECPieceKey(objectID, segmentIndex, uint32(redundancyIndex))) + segmentIndex++ + } + return pieceKeyList +} diff --git a/service/manager/gc_worker.go b/service/manager/gc_worker.go new file mode 100644 index 000000000..673ec0d2f --- /dev/null +++ b/service/manager/gc_worker.go @@ -0,0 +1,138 @@ +package manager + +import ( + "context" + "strings" + "time" + + storagetypes "github.com/bnb-chain/greenfield/x/storage/types" + + "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" + "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + metatypes "github.com/bnb-chain/greenfield-storage-provider/service/metadata/types" +) + +const ( + defaultGCBlockSpanPerLoop = 100 + defaultGCBlockSpanBeforeLatestBlock = 60 +) + +// GCWorker is responsible for releasing the space occupied by the deleted object in the piece-store. +// TODO: Will be refactored into task-node in the future. +type GCWorker struct { + manager *Manager + currentGCBlock uint64 // TODO: load gc point from db +} + +// Start is a non-blocking function that starts a goroutine execution logic internally. +func (w *GCWorker) Start() { + go w.startGC() + log.Info("start gc worker") + +} + +// Stop is responsible for stop gc. +func (w *GCWorker) Stop() { + log.Info("stop gc worker") +} + +// startGC starts an execution logic internally. +func (w *GCWorker) startGC() { + var ( + gcLoopNumber uint64 + gcObjectNumberOneLoop uint64 + height uint64 + err error + startBlock uint64 + endBlock uint64 + currentLatestBlock uint64 + storageParams *storagetypes.Params + response *metatypes.ListDeletedObjectsByBlockNumberRangeResponse + ) + + for { + if gcLoopNumber%100 == 0 { + height, err = w.manager.chain.GetCurrentHeight(context.Background()) + if err != nil { + log.Errorw("failed to query current chain height", "error", err) + time.Sleep(1 * time.Second) + continue + } + currentLatestBlock = height + log.Infow("succeed to fetch block height", "height", height) + + storageParams, err = w.manager.chain.QueryStorageParams(context.Background()) + if err != nil { + log.Errorw("failed to query storage params", "error", err) + time.Sleep(1 * time.Second) + continue + } + log.Infow("succeed to fetch storage params", "storage_params", storageParams) + } + gcLoopNumber++ + gcObjectNumberOneLoop = 0 + + startBlock = w.currentGCBlock + endBlock = w.currentGCBlock + defaultGCBlockSpanPerLoop + if startBlock+defaultGCBlockSpanBeforeLatestBlock > currentLatestBlock { + log.Infow("skip gc and try again later", + "start_block", startBlock, "latest_block", currentLatestBlock) + time.Sleep(10 * time.Second) + continue + } + if endBlock+defaultGCBlockSpanBeforeLatestBlock > currentLatestBlock { + endBlock = currentLatestBlock - defaultGCBlockSpanBeforeLatestBlock + } + + response, err = w.manager.metadata.ListDeletedObjectsByBlockNumberRange(context.Background(), + &metatypes.ListDeletedObjectsByBlockNumberRangeRequest{ + StartBlockNumber: int64(startBlock), + EndBlockNumber: int64(endBlock), + IsFullList: true, + }) + if err != nil { + log.Warnw("failed to query deleted objects", + "start_block", startBlock, "end_block", endBlock, "error", err) + time.Sleep(1 * time.Second) + continue + } + for _, object := range response.GetObjects() { + gcObjectNumberOneLoop++ + // TODO: refine gc workflow by enrich metadata index. + w.gcSegmentPiece(object.GetObjectInfo(), storageParams) + w.gcECPiece(object.GetObjectInfo(), storageParams) + log.Infow("succeed to gc object piece store", "object_info", object.GetObjectInfo()) + } + + log.Infow("succeed to gc one loop", + "start_block", startBlock, "end_block", endBlock, + "gc_object_number", gcObjectNumberOneLoop, "loop_number", gcLoopNumber) + w.currentGCBlock = uint64(response.EndBlockNumber) + 1 + } +} + +// gcSegmentPiece is used to gc segment piece. +func (w *GCWorker) gcSegmentPiece(objectInfo *storagetypes.ObjectInfo, storageParams *storagetypes.Params) { + keyList := piecestore.GenerateObjectSegmentKeyList(objectInfo.Id.Uint64(), + objectInfo.GetPayloadSize(), storageParams.GetMaxSegmentSize()) + for _, key := range keyList { + w.manager.pieceStore.DeletePiece(key) + } +} + +// gcECPiece is used to gc ec piece. +func (w *GCWorker) gcECPiece(objectInfo *storagetypes.ObjectInfo, storageParams *storagetypes.Params) { + if objectInfo.GetRedundancyType() != storagetypes.REDUNDANCY_REPLICA_TYPE { + return + } + for redundancyIndex, address := range objectInfo.GetSecondarySpAddresses() { + if strings.Compare(w.manager.config.SpOperatorAddress, address) == 0 { + keyList := piecestore.GenerateObjectECKeyList( + objectInfo.Id.Uint64(), objectInfo.GetPayloadSize(), + storageParams.GetMaxSegmentSize(), uint64(redundancyIndex)) + for _, key := range keyList { + w.manager.pieceStore.DeletePiece(key) + } + } + } +} diff --git a/service/manager/manager.go b/service/manager/manager.go index c4af3cdff..60035360a 100644 --- a/service/manager/manager.go +++ b/service/manager/manager.go @@ -10,6 +10,8 @@ import ( gnfd "github.com/bnb-chain/greenfield-storage-provider/pkg/greenfield" "github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + metadataclient "github.com/bnb-chain/greenfield-storage-provider/service/metadata/client" + psclient "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/client" "github.com/bnb-chain/greenfield-storage-provider/store/sqldb" ) @@ -21,14 +23,17 @@ var ( ) // Manager module is responsible for implementing internal management functions. -// Currently, it supports periodic update of sp info list and storage params information in spdb. -// TODO::support gc and configuration management, etc. +// Currently, it supports periodic update of sp info list and storage params information in sp-db. +// TODO: support gc and configuration management, etc. type Manager struct { - config *ManagerConfig - running atomic.Value - stopCh chan struct{} - chain *gnfd.Greenfield - spDB sqldb.SPDB + config *ManagerConfig + running atomic.Value + stopCh chan struct{} + chain *gnfd.Greenfield + spDB sqldb.SPDB + metadata *metadataclient.MetadataClient + pieceStore *psclient.StoreClient + gcWorker *GCWorker } // NewManagerService returns an instance of manager @@ -39,15 +44,24 @@ func NewManagerService(cfg *ManagerConfig) (*Manager, error) { ) manager = &Manager{ - config: cfg, - stopCh: make(chan struct{}), + config: cfg, + stopCh: make(chan struct{}), + gcWorker: &GCWorker{}, } if manager.chain, err = gnfd.NewGreenfield(cfg.ChainConfig); err != nil { log.Errorw("failed to create chain client", "error", err) return nil, err } if manager.spDB, err = sqldb.NewSpDB(cfg.SpDBConfig); err != nil { - log.Errorw("failed to create spdb client", "error", err) + log.Errorw("failed to create sp-db client", "error", err) + return nil, err + } + if manager.metadata, err = metadataclient.NewMetadataClient(cfg.MetadataGrpcAddress); err != nil { + log.Errorw("failed to create metadata client", "error", err) + return nil, err + } + if manager.pieceStore, err = psclient.NewStoreClient(cfg.PieceStoreConfig); err != nil { + log.Errorw("failed to create piece store client", "error", err) return nil, err } @@ -65,7 +79,9 @@ func (m *Manager) Start(ctx context.Context) error { return errors.New("manager has already started") } - // start background task + m.gcWorker.manager = m + m.gcWorker.Start() + go m.eventLoop() return nil } @@ -121,6 +137,7 @@ func (m *Manager) Stop(ctx context.Context) error { if m.running.Swap(false) == false { return errors.New("manager has already stop") } + m.gcWorker.Stop() close(m.stopCh) return nil } diff --git a/service/manager/manager_config.go b/service/manager/manager_config.go index f3223617d..e32597cdf 100644 --- a/service/manager/manager_config.go +++ b/service/manager/manager_config.go @@ -3,11 +3,14 @@ package manager import ( gnfd "github.com/bnb-chain/greenfield-storage-provider/pkg/greenfield" "github.com/bnb-chain/greenfield-storage-provider/store/config" + "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storage" ) // ManagerConfig defines manager service config type ManagerConfig struct { - SpOperatorAddress string - ChainConfig *gnfd.GreenfieldChainConfig - SpDBConfig *config.SQLDBConfig + SpOperatorAddress string + ChainConfig *gnfd.GreenfieldChainConfig + SpDBConfig *config.SQLDBConfig + MetadataGrpcAddress string + PieceStoreConfig *storage.PieceStoreConfig } diff --git a/store/piecestore/client/piece_store_client.go b/store/piecestore/client/piece_store_client.go index 199601a9a..079c78ee3 100644 --- a/store/piecestore/client/piece_store_client.go +++ b/store/piecestore/client/piece_store_client.go @@ -26,8 +26,9 @@ type StoreClient struct { } const ( - getPieceMethodName = "getPiece" - putPieceMethodName = "putPiece" + getPieceMethodName = "getPiece" + putPieceMethodName = "putPiece" + deletePieceMethodName = "deletePiece" ) func NewStoreClient(pieceConfig *storage.PieceStoreConfig) (*StoreClient, error) { @@ -38,7 +39,7 @@ func NewStoreClient(pieceConfig *storage.PieceStoreConfig) (*StoreClient, error) return &StoreClient{ps: ps}, nil } -// GetPiece gets piece data from piece store +// GetPiece gets piece data from piece store. func (client *StoreClient) GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) { startTime := time.Now() defer func() { @@ -61,7 +62,7 @@ func (client *StoreClient) GetPiece(ctx context.Context, key string, offset, lim return buf.Bytes(), nil } -// PutPiece puts piece to piece store +// PutPiece puts piece to piece store. func (client *StoreClient) PutPiece(key string, value []byte) error { startTime := time.Now() defer func() { @@ -72,3 +73,15 @@ func (client *StoreClient) PutPiece(key string, value []byte) error { return client.ps.Put(context.Background(), key, bytes.NewReader(value)) } + +// DeletePiece deletes piece from piece store. +func (client *StoreClient) DeletePiece(key string) error { + startTime := time.Now() + defer func() { + observer := metrics.PieceStoreTimeHistogram.WithLabelValues(deletePieceMethodName) + observer.Observe(time.Since(startTime).Seconds()) + metrics.PieceStoreRequestTotal.WithLabelValues(deletePieceMethodName) + }() + + return client.ps.Delete(context.Background(), key) +}