Skip to content

Commit

Permalink
feat: add gc worker
Browse files Browse the repository at this point in the history
  • Loading branch information
will-2012 committed May 9, 2023
1 parent 80b1933 commit 1ac71e3
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 18 deletions.
6 changes: 6 additions & 0 deletions config/subconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ func (cfg *StorageProviderConfig) MakeManagerServiceConfig() (*manager.ManagerCo
SpOperatorAddress: cfg.SpOperatorAddress,
ChainConfig: cfg.ChainConfig,
SpDBConfig: cfg.SpDBConfig,
PieceStoreConfig: cfg.PieceStoreConfig,
}
if _, ok := cfg.Endpoint[model.MetadataService]; ok {
managerConfig.MetadataGrpcAddress = cfg.Endpoint[model.MetadataService]
} else {
return nil, fmt.Errorf("missing metadata server gRPC address configuration for manager service")
}
return managerConfig, nil
}
Expand Down
32 changes: 32 additions & 0 deletions model/piecestore/piece_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,35 @@ func ComputeSegmentCount(size uint64, spiltSize uint64) uint32 {
}
return segmentCount
}

// GenerateObjectSegmentKeyList generate object's segment piece key list.
func GenerateObjectSegmentKeyList(objectID, objectSize, segmentSize uint64) []string {
var (
segmentIndex uint32
segmentCount uint32
pieceKeyList = make([]string, 0)
)

segmentCount = ComputeSegmentCount(objectSize, segmentSize)
for segmentIndex < segmentCount {
pieceKeyList = append(pieceKeyList, EncodeSegmentPieceKey(objectID, segmentIndex))
segmentIndex++
}
return pieceKeyList
}

// GenerateObjectECKeyList generate object's ec piece key list.
func GenerateObjectECKeyList(objectID, objectSize, segmentSize, redundancyIndex uint64) []string {
var (
segmentIndex uint32
segmentCount uint32
pieceKeyList = make([]string, 0)
)

segmentCount = ComputeSegmentCount(objectSize, segmentSize)
for segmentIndex < segmentCount {
pieceKeyList = append(pieceKeyList, EncodeECPieceKey(objectID, segmentIndex, uint32(redundancyIndex)))
segmentIndex++
}
return pieceKeyList
}
138 changes: 138 additions & 0 deletions service/manager/gc_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package manager

import (
"context"
"strings"
"time"

storagetypes "github.com/bnb-chain/greenfield/x/storage/types"

"github.com/bnb-chain/greenfield-storage-provider/model/piecestore"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
metatypes "github.com/bnb-chain/greenfield-storage-provider/service/metadata/types"
)

const (
defaultGCBlockSpanPerLoop = 100
defaultGCBlockSpanBeforeLatestBlock = 60
)

// GCWorker is responsible for releasing the space occupied by the deleted object in the piece-store.
// TODO: Will be refactored into task-node in the future.
type GCWorker struct {
manager *Manager
currentGCBlock uint64 // TODO: load gc point from db
}

// Start is a non-blocking function that starts a goroutine execution logic internally.
func (w *GCWorker) Start() {
go w.startGC()
log.Info("start gc worker")

}

// Stop is responsible for stop gc.
func (w *GCWorker) Stop() {
log.Info("stop gc worker")
}

// startGC starts an execution logic internally.
func (w *GCWorker) startGC() {
var (
gcLoopNumber uint64
gcObjectNumberOneLoop uint64
height uint64
err error
startBlock uint64
endBlock uint64
currentLatestBlock uint64
storageParams *storagetypes.Params
response *metatypes.ListDeletedObjectsByBlockNumberRangeResponse
)

for {
if gcLoopNumber%100 == 0 {
height, err = w.manager.chain.GetCurrentHeight(context.Background())
if err != nil {
log.Errorw("failed to query current chain height", "error", err)
time.Sleep(1 * time.Second)
continue
}
currentLatestBlock = height
log.Infow("succeed to fetch block height", "height", height)

storageParams, err = w.manager.chain.QueryStorageParams(context.Background())
if err != nil {
log.Errorw("failed to query storage params", "error", err)
time.Sleep(1 * time.Second)
continue
}
log.Infow("succeed to fetch storage params", "storage_params", storageParams)
}
gcLoopNumber++
gcObjectNumberOneLoop = 0

startBlock = w.currentGCBlock
endBlock = w.currentGCBlock + defaultGCBlockSpanPerLoop
if startBlock+defaultGCBlockSpanBeforeLatestBlock > currentLatestBlock {
log.Infow("skip gc and try again later",
"start_block", startBlock, "latest_block", currentLatestBlock)
time.Sleep(10 * time.Second)
continue
}
if endBlock+defaultGCBlockSpanBeforeLatestBlock > currentLatestBlock {
endBlock = currentLatestBlock - defaultGCBlockSpanBeforeLatestBlock
}

response, err = w.manager.metadata.ListDeletedObjectsByBlockNumberRange(context.Background(),
&metatypes.ListDeletedObjectsByBlockNumberRangeRequest{
StartBlockNumber: int64(startBlock),
EndBlockNumber: int64(endBlock),
IsFullList: true,
})
if err != nil {
log.Warnw("failed to query deleted objects",
"start_block", startBlock, "end_block", endBlock, "error", err)
time.Sleep(1 * time.Second)
continue
}
for _, object := range response.GetObjects() {
gcObjectNumberOneLoop++
// TODO: refine gc workflow by enrich metadata index.
w.gcSegmentPiece(object.GetObjectInfo(), storageParams)
w.gcECPiece(object.GetObjectInfo(), storageParams)
log.Infow("succeed to gc object piece store", "object_info", object.GetObjectInfo())
}

log.Infow("succeed to gc one loop",
"start_block", startBlock, "end_block", endBlock,
"gc_object_number", gcObjectNumberOneLoop, "loop_number", gcLoopNumber)
w.currentGCBlock = uint64(response.EndBlockNumber) + 1
}
}

// gcSegmentPiece is used to gc segment piece.
func (w *GCWorker) gcSegmentPiece(objectInfo *storagetypes.ObjectInfo, storageParams *storagetypes.Params) {
keyList := piecestore.GenerateObjectSegmentKeyList(objectInfo.Id.Uint64(),
objectInfo.GetPayloadSize(), storageParams.GetMaxSegmentSize())
for _, key := range keyList {
w.manager.pieceStore.DeletePiece(key)
}
}

// gcECPiece is used to gc ec piece.
func (w *GCWorker) gcECPiece(objectInfo *storagetypes.ObjectInfo, storageParams *storagetypes.Params) {
if objectInfo.GetRedundancyType() != storagetypes.REDUNDANCY_REPLICA_TYPE {
return
}
for redundancyIndex, address := range objectInfo.GetSecondarySpAddresses() {
if strings.Compare(w.manager.config.SpOperatorAddress, address) == 0 {
keyList := piecestore.GenerateObjectECKeyList(
objectInfo.Id.Uint64(), objectInfo.GetPayloadSize(),
storageParams.GetMaxSegmentSize(), uint64(redundancyIndex))
for _, key := range keyList {
w.manager.pieceStore.DeletePiece(key)
}
}
}
}
39 changes: 28 additions & 11 deletions service/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
gnfd "github.com/bnb-chain/greenfield-storage-provider/pkg/greenfield"
"github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
metadataclient "github.com/bnb-chain/greenfield-storage-provider/service/metadata/client"
psclient "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/client"
"github.com/bnb-chain/greenfield-storage-provider/store/sqldb"
)

Expand All @@ -21,14 +23,17 @@ var (
)

// Manager module is responsible for implementing internal management functions.
// Currently, it supports periodic update of sp info list and storage params information in spdb.
// TODO::support gc and configuration management, etc.
// Currently, it supports periodic update of sp info list and storage params information in sp-db.
// TODO: support gc and configuration management, etc.
type Manager struct {
config *ManagerConfig
running atomic.Value
stopCh chan struct{}
chain *gnfd.Greenfield
spDB sqldb.SPDB
config *ManagerConfig
running atomic.Value
stopCh chan struct{}
chain *gnfd.Greenfield
spDB sqldb.SPDB
metadata *metadataclient.MetadataClient
pieceStore *psclient.StoreClient
gcWorker *GCWorker
}

// NewManagerService returns an instance of manager
Expand All @@ -39,15 +44,24 @@ func NewManagerService(cfg *ManagerConfig) (*Manager, error) {
)

manager = &Manager{
config: cfg,
stopCh: make(chan struct{}),
config: cfg,
stopCh: make(chan struct{}),
gcWorker: &GCWorker{},
}
if manager.chain, err = gnfd.NewGreenfield(cfg.ChainConfig); err != nil {
log.Errorw("failed to create chain client", "error", err)
return nil, err
}
if manager.spDB, err = sqldb.NewSpDB(cfg.SpDBConfig); err != nil {
log.Errorw("failed to create spdb client", "error", err)
log.Errorw("failed to create sp-db client", "error", err)
return nil, err
}
if manager.metadata, err = metadataclient.NewMetadataClient(cfg.MetadataGrpcAddress); err != nil {
log.Errorw("failed to create metadata client", "error", err)
return nil, err
}
if manager.pieceStore, err = psclient.NewStoreClient(cfg.PieceStoreConfig); err != nil {
log.Errorw("failed to create piece store client", "error", err)
return nil, err
}

Expand All @@ -65,7 +79,9 @@ func (m *Manager) Start(ctx context.Context) error {
return errors.New("manager has already started")
}

// start background task
m.gcWorker.manager = m
m.gcWorker.Start()

go m.eventLoop()
return nil
}
Expand Down Expand Up @@ -121,6 +137,7 @@ func (m *Manager) Stop(ctx context.Context) error {
if m.running.Swap(false) == false {
return errors.New("manager has already stop")
}
m.gcWorker.Stop()
close(m.stopCh)
return nil
}
9 changes: 6 additions & 3 deletions service/manager/manager_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package manager
import (
gnfd "github.com/bnb-chain/greenfield-storage-provider/pkg/greenfield"
"github.com/bnb-chain/greenfield-storage-provider/store/config"
"github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storage"
)

// ManagerConfig defines manager service config
type ManagerConfig struct {
SpOperatorAddress string
ChainConfig *gnfd.GreenfieldChainConfig
SpDBConfig *config.SQLDBConfig
SpOperatorAddress string
ChainConfig *gnfd.GreenfieldChainConfig
SpDBConfig *config.SQLDBConfig
MetadataGrpcAddress string
PieceStoreConfig *storage.PieceStoreConfig
}
21 changes: 17 additions & 4 deletions store/piecestore/client/piece_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type StoreClient struct {
}

const (
getPieceMethodName = "getPiece"
putPieceMethodName = "putPiece"
getPieceMethodName = "getPiece"
putPieceMethodName = "putPiece"
deletePieceMethodName = "deletePiece"
)

func NewStoreClient(pieceConfig *storage.PieceStoreConfig) (*StoreClient, error) {
Expand All @@ -38,7 +39,7 @@ func NewStoreClient(pieceConfig *storage.PieceStoreConfig) (*StoreClient, error)
return &StoreClient{ps: ps}, nil
}

// GetPiece gets piece data from piece store
// GetPiece gets piece data from piece store.
func (client *StoreClient) GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) {
startTime := time.Now()
defer func() {
Expand All @@ -61,7 +62,7 @@ func (client *StoreClient) GetPiece(ctx context.Context, key string, offset, lim
return buf.Bytes(), nil
}

// PutPiece puts piece to piece store
// PutPiece puts piece to piece store.
func (client *StoreClient) PutPiece(key string, value []byte) error {
startTime := time.Now()
defer func() {
Expand All @@ -72,3 +73,15 @@ func (client *StoreClient) PutPiece(key string, value []byte) error {

return client.ps.Put(context.Background(), key, bytes.NewReader(value))
}

// DeletePiece deletes piece from piece store.
func (client *StoreClient) DeletePiece(key string) error {
startTime := time.Now()
defer func() {
observer := metrics.PieceStoreTimeHistogram.WithLabelValues(deletePieceMethodName)
observer.Observe(time.Since(startTime).Seconds())
metrics.PieceStoreRequestTotal.WithLabelValues(deletePieceMethodName)
}()

return client.ps.Delete(context.Background(), key)
}

0 comments on commit 1ac71e3

Please sign in to comment.