Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add gc worker #408

Merged
merged 1 commit into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/subconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ func (cfg *StorageProviderConfig) MakeManagerServiceConfig() (*manager.ManagerCo
SpOperatorAddress: cfg.SpOperatorAddress,
ChainConfig: cfg.ChainConfig,
SpDBConfig: cfg.SpDBConfig,
PieceStoreConfig: cfg.PieceStoreConfig,
}
if _, ok := cfg.Endpoint[model.MetadataService]; ok {
managerConfig.MetadataGrpcAddress = cfg.Endpoint[model.MetadataService]
} else {
return nil, fmt.Errorf("missing metadata server gRPC address configuration for manager service")
}
return managerConfig, nil
}
Expand Down
32 changes: 32 additions & 0 deletions model/piecestore/piece_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,35 @@ func ComputeSegmentCount(size uint64, spiltSize uint64) uint32 {
}
return segmentCount
}

// GenerateObjectSegmentKeyList generate object's segment piece key list.
func GenerateObjectSegmentKeyList(objectID, objectSize, segmentSize uint64) []string {
var (
segmentCount uint32
segmentIndex uint32
pieceKeyList = make([]string, 0)
)

segmentCount = ComputeSegmentCount(objectSize, segmentSize)
for segmentIndex < segmentCount {
pieceKeyList = append(pieceKeyList, EncodeSegmentPieceKey(objectID, segmentIndex))
segmentIndex++
}
return pieceKeyList
}

// GenerateObjectECKeyList generate object's ec piece key list.
func GenerateObjectECKeyList(objectID, objectSize, segmentSize, redundancyIndex uint64) []string {
var (
segmentCount uint32
segmentIndex uint32
pieceKeyList = make([]string, 0)
)

segmentCount = ComputeSegmentCount(objectSize, segmentSize)
for segmentIndex < segmentCount {
pieceKeyList = append(pieceKeyList, EncodeECPieceKey(objectID, segmentIndex, uint32(redundancyIndex)))
segmentIndex++
}
return pieceKeyList
}
138 changes: 138 additions & 0 deletions service/manager/gc_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package manager

import (
"context"
"strings"
"time"

storagetypes "github.com/bnb-chain/greenfield/x/storage/types"

"github.com/bnb-chain/greenfield-storage-provider/model/piecestore"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
metatypes "github.com/bnb-chain/greenfield-storage-provider/service/metadata/types"
)

const (
defaultGCBlockSpanPerLoop = 100
defaultGCBlockSpanBeforeLatestBlock = 60
)

// GCWorker is responsible for releasing the space occupied by the deleted object in the piece-store.
// TODO: Will be refactored into task-node in the future.
type GCWorker struct {
manager *Manager
currentGCBlock uint64 // TODO: load gc point from db
}

// Start is a non-blocking function that starts a goroutine execution logic internally.
func (w *GCWorker) Start() {
go w.startGC()
log.Info("start gc worker")

}

// Stop is responsible for stop gc.
func (w *GCWorker) Stop() {
log.Info("stop gc worker")
}

// startGC starts an execution logic internally.
func (w *GCWorker) startGC() {
var (
gcLoopNumber uint64
gcObjectNumberOneLoop uint64
height uint64
err error
startBlock uint64
endBlock uint64
currentLatestBlock uint64
storageParams *storagetypes.Params
response *metatypes.ListDeletedObjectsByBlockNumberRangeResponse
)

for {
if gcLoopNumber%100 == 0 {
height, err = w.manager.chain.GetCurrentHeight(context.Background())
if err != nil {
log.Errorw("failed to query current chain height", "error", err)
time.Sleep(1 * time.Second)
continue
}
currentLatestBlock = height
log.Infow("succeed to fetch block height", "height", height)

storageParams, err = w.manager.chain.QueryStorageParams(context.Background())
if err != nil {
log.Errorw("failed to query storage params", "error", err)
time.Sleep(1 * time.Second)
continue
}
log.Infow("succeed to fetch storage params", "storage_params", storageParams)
}
gcLoopNumber++
gcObjectNumberOneLoop = 0

startBlock = w.currentGCBlock
endBlock = w.currentGCBlock + defaultGCBlockSpanPerLoop
if startBlock+defaultGCBlockSpanBeforeLatestBlock > currentLatestBlock {
log.Infow("skip gc and try again later",
"start_block", startBlock, "latest_block", currentLatestBlock)
time.Sleep(10 * time.Second)
continue
}
if endBlock+defaultGCBlockSpanBeforeLatestBlock > currentLatestBlock {
endBlock = currentLatestBlock - defaultGCBlockSpanBeforeLatestBlock
}

response, err = w.manager.metadata.ListDeletedObjectsByBlockNumberRange(context.Background(),
&metatypes.ListDeletedObjectsByBlockNumberRangeRequest{
StartBlockNumber: int64(startBlock),
EndBlockNumber: int64(endBlock),
IsFullList: true,
})
if err != nil {
log.Warnw("failed to query deleted objects",
"start_block", startBlock, "end_block", endBlock, "error", err)
time.Sleep(1 * time.Second)
continue
}
for _, object := range response.GetObjects() {
gcObjectNumberOneLoop++
// TODO: refine gc workflow by enrich metadata index.
w.gcSegmentPiece(object.GetObjectInfo(), storageParams)
w.gcECPiece(object.GetObjectInfo(), storageParams)
log.Infow("succeed to gc object piece store", "object_info", object.GetObjectInfo())
}

log.Infow("succeed to gc one loop",
"start_block", startBlock, "end_block", endBlock,
"gc_object_number", gcObjectNumberOneLoop, "loop_number", gcLoopNumber)
w.currentGCBlock = uint64(response.EndBlockNumber) + 1
}
}

// gcSegmentPiece is used to gc segment piece.
func (w *GCWorker) gcSegmentPiece(objectInfo *storagetypes.ObjectInfo, storageParams *storagetypes.Params) {
keyList := piecestore.GenerateObjectSegmentKeyList(objectInfo.Id.Uint64(),
objectInfo.GetPayloadSize(), storageParams.GetMaxSegmentSize())
for _, key := range keyList {
w.manager.pieceStore.DeletePiece(key)
}
}

// gcECPiece is used to gc ec piece.
func (w *GCWorker) gcECPiece(objectInfo *storagetypes.ObjectInfo, storageParams *storagetypes.Params) {
if objectInfo.GetRedundancyType() != storagetypes.REDUNDANCY_REPLICA_TYPE {
return
}
for redundancyIndex, address := range objectInfo.GetSecondarySpAddresses() {
if strings.Compare(w.manager.config.SpOperatorAddress, address) == 0 {
keyList := piecestore.GenerateObjectECKeyList(
objectInfo.Id.Uint64(), objectInfo.GetPayloadSize(),
storageParams.GetMaxSegmentSize(), uint64(redundancyIndex))
for _, key := range keyList {
w.manager.pieceStore.DeletePiece(key)
}
}
}
}
39 changes: 28 additions & 11 deletions service/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
gnfd "github.com/bnb-chain/greenfield-storage-provider/pkg/greenfield"
"github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle"
"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
metadataclient "github.com/bnb-chain/greenfield-storage-provider/service/metadata/client"
psclient "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/client"
"github.com/bnb-chain/greenfield-storage-provider/store/sqldb"
)

Expand All @@ -21,14 +23,17 @@ var (
)

// Manager module is responsible for implementing internal management functions.
// Currently, it supports periodic update of sp info list and storage params information in spdb.
// TODO::support gc and configuration management, etc.
// Currently, it supports periodic update of sp info list and storage params information in sp-db.
// TODO: support gc and configuration management, etc.
type Manager struct {
config *ManagerConfig
running atomic.Value
stopCh chan struct{}
chain *gnfd.Greenfield
spDB sqldb.SPDB
config *ManagerConfig
running atomic.Value
stopCh chan struct{}
chain *gnfd.Greenfield
spDB sqldb.SPDB
metadata *metadataclient.MetadataClient
pieceStore *psclient.StoreClient
gcWorker *GCWorker
}

// NewManagerService returns an instance of manager
Expand All @@ -39,15 +44,24 @@ func NewManagerService(cfg *ManagerConfig) (*Manager, error) {
)

manager = &Manager{
config: cfg,
stopCh: make(chan struct{}),
config: cfg,
stopCh: make(chan struct{}),
gcWorker: &GCWorker{},
}
if manager.chain, err = gnfd.NewGreenfield(cfg.ChainConfig); err != nil {
log.Errorw("failed to create chain client", "error", err)
return nil, err
}
if manager.spDB, err = sqldb.NewSpDB(cfg.SpDBConfig); err != nil {
log.Errorw("failed to create spdb client", "error", err)
log.Errorw("failed to create sp-db client", "error", err)
return nil, err
}
if manager.metadata, err = metadataclient.NewMetadataClient(cfg.MetadataGrpcAddress); err != nil {
log.Errorw("failed to create metadata client", "error", err)
return nil, err
}
if manager.pieceStore, err = psclient.NewStoreClient(cfg.PieceStoreConfig); err != nil {
log.Errorw("failed to create piece store client", "error", err)
return nil, err
}

Expand All @@ -65,7 +79,9 @@ func (m *Manager) Start(ctx context.Context) error {
return errors.New("manager has already started")
}

// start background task
m.gcWorker.manager = m
m.gcWorker.Start()

go m.eventLoop()
return nil
}
Expand Down Expand Up @@ -121,6 +137,7 @@ func (m *Manager) Stop(ctx context.Context) error {
if m.running.Swap(false) == false {
return errors.New("manager has already stop")
}
m.gcWorker.Stop()
close(m.stopCh)
return nil
}
9 changes: 6 additions & 3 deletions service/manager/manager_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package manager
import (
gnfd "github.com/bnb-chain/greenfield-storage-provider/pkg/greenfield"
"github.com/bnb-chain/greenfield-storage-provider/store/config"
"github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storage"
)

// ManagerConfig defines manager service config
type ManagerConfig struct {
SpOperatorAddress string
ChainConfig *gnfd.GreenfieldChainConfig
SpDBConfig *config.SQLDBConfig
SpOperatorAddress string
ChainConfig *gnfd.GreenfieldChainConfig
SpDBConfig *config.SQLDBConfig
MetadataGrpcAddress string
PieceStoreConfig *storage.PieceStoreConfig
}
21 changes: 17 additions & 4 deletions store/piecestore/client/piece_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type StoreClient struct {
}

const (
getPieceMethodName = "getPiece"
putPieceMethodName = "putPiece"
getPieceMethodName = "getPiece"
putPieceMethodName = "putPiece"
deletePieceMethodName = "deletePiece"
)

func NewStoreClient(pieceConfig *storage.PieceStoreConfig) (*StoreClient, error) {
Expand All @@ -38,7 +39,7 @@ func NewStoreClient(pieceConfig *storage.PieceStoreConfig) (*StoreClient, error)
return &StoreClient{ps: ps}, nil
}

// GetPiece gets piece data from piece store
// GetPiece gets piece data from piece store.
func (client *StoreClient) GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) {
startTime := time.Now()
defer func() {
Expand All @@ -61,7 +62,7 @@ func (client *StoreClient) GetPiece(ctx context.Context, key string, offset, lim
return buf.Bytes(), nil
}

// PutPiece puts piece to piece store
// PutPiece puts piece to piece store.
func (client *StoreClient) PutPiece(key string, value []byte) error {
startTime := time.Now()
defer func() {
Expand All @@ -72,3 +73,15 @@ func (client *StoreClient) PutPiece(key string, value []byte) error {

return client.ps.Put(context.Background(), key, bytes.NewReader(value))
}

// DeletePiece deletes piece from piece store.
func (client *StoreClient) DeletePiece(key string) error {
startTime := time.Now()
defer func() {
observer := metrics.PieceStoreTimeHistogram.WithLabelValues(deletePieceMethodName)
observer.Observe(time.Since(startTime).Seconds())
metrics.PieceStoreRequestTotal.WithLabelValues(deletePieceMethodName)
}()

return client.ps.Delete(context.Background(), key)
}