Skip to content

Commit

Permalink
feat:add upload download add bandwidth limit
Browse files Browse the repository at this point in the history
  • Loading branch information
constwz committed May 10, 2023
2 parents 695bd15 + e9dbddc commit 56ac4f7
Show file tree
Hide file tree
Showing 33 changed files with 1,318 additions and 341 deletions.
89 changes: 59 additions & 30 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/bnb-chain/greenfield-storage-provider/pkg/p2p"
"github.com/bnb-chain/greenfield-storage-provider/pkg/pprof"
"github.com/bnb-chain/greenfield-storage-provider/service/blocksyncer"
"github.com/bnb-chain/greenfield-storage-provider/service/metadata"
"github.com/bnb-chain/greenfield-storage-provider/service/signer"
"github.com/bnb-chain/greenfield-storage-provider/service/stopserving"
"github.com/bnb-chain/greenfield-storage-provider/store/config"
Expand All @@ -25,22 +26,25 @@ import (

// StorageProviderConfig defines the configuration of storage provider
type StorageProviderConfig struct {
Service []string
SpOperatorAddress string
Endpoint map[string]string
ListenAddress map[string]string
SpDBConfig *config.SQLDBConfig
PieceStoreConfig *storage.PieceStoreConfig
ChainConfig *gnfd.GreenfieldChainConfig
SignerCfg *signer.SignerConfig
BlockSyncerCfg *blocksyncer.Config
P2PCfg *p2p.NodeConfig
LogCfg *LogConfig
MetricsCfg *metrics.MetricsConfig
PProfCfg *pprof.PProfConfig
RateLimiter *localhttp.RateLimiterConfig
BandwidthLimiter *localhttp.BandWidthLimiterConfig
DiscontinueCfg *stopserving.DiscontinueConfig
Service []string
SpOperatorAddress string
Endpoint map[string]string
ListenAddress map[string]string
SpDBConfig *config.SQLDBConfig
BsDBConfig *config.SQLDBConfig
BsDBSwitchedConfig *config.SQLDBConfig
PieceStoreConfig *storage.PieceStoreConfig
ChainConfig *gnfd.GreenfieldChainConfig
SignerCfg *signer.SignerConfig
BlockSyncerCfg *blocksyncer.Config
P2PCfg *p2p.NodeConfig
LogCfg *LogConfig
MetricsCfg *metrics.MetricsConfig
PProfCfg *pprof.PProfConfig
RateLimiter *localhttp.RateLimiterConfig
DiscontinueCfg *stopserving.DiscontinueConfig
MetadataCfg *metadata.MetadataConfig
BandwidthLimiter *localhttp.BandwidthLimiterConfig
}

// JSONMarshal marshal the StorageProviderConfig to json format
Expand Down Expand Up @@ -93,19 +97,22 @@ var DefaultStorageProviderConfig = &StorageProviderConfig{
model.P2PService: model.P2PGRPCAddress,
model.AuthService: model.AuthGRPCAddress,
},
SpOperatorAddress: hex.EncodeToString([]byte(model.SpOperatorAddress)),
SpDBConfig: DefaultSQLDBConfig,
PieceStoreConfig: DefaultPieceStoreConfig,
ChainConfig: DefaultGreenfieldChainConfig,
SignerCfg: signer.DefaultSignerChainConfig,
BlockSyncerCfg: DefaultBlockSyncerConfig,
P2PCfg: DefaultP2PConfig,
LogCfg: DefaultLogConfig,
MetricsCfg: DefaultMetricsConfig,
PProfCfg: DefaultPProfConfig,
RateLimiter: DefaultRateLimiterConfig,
BandwidthLimiter: DefaultBandwidthLimiterConfig,
DiscontinueCfg: stopserving.DefaultDiscontinueConfig,
SpOperatorAddress: hex.EncodeToString([]byte(model.SpOperatorAddress)),
SpDBConfig: DefaultSQLDBConfig,
BsDBConfig: DefaultBsDBConfig,
BsDBSwitchedConfig: DefaultBsDBSwitchedConfig,
PieceStoreConfig: DefaultPieceStoreConfig,
ChainConfig: DefaultGreenfieldChainConfig,
SignerCfg: signer.DefaultSignerChainConfig,
BlockSyncerCfg: DefaultBlockSyncerConfig,
P2PCfg: DefaultP2PConfig,
LogCfg: DefaultLogConfig,
MetricsCfg: DefaultMetricsConfig,
PProfCfg: DefaultPProfConfig,
RateLimiter: DefaultRateLimiterConfig,
DiscontinueCfg: stopserving.DefaultDiscontinueConfig,
MetadataCfg: DefaultMetadataConfig,
BandwidthLimiter: DefaultBandwidthLimiterConfig,
}

// DefaultSQLDBConfig defines the default configuration of SQL DB
Expand All @@ -116,6 +123,22 @@ var DefaultSQLDBConfig = &storeconfig.SQLDBConfig{
Database: "storage_provider_db",
}

// DefaultBsDBConfig defines the default configuration of Bs DB
var DefaultBsDBConfig = &storeconfig.SQLDBConfig{
User: "root",
Passwd: "test_pwd",
Address: "localhost:3306",
Database: "block_syncer",
}

// DefaultBsDBSwitchedConfig defines the default configuration for the switched Bs DB.
var DefaultBsDBSwitchedConfig = &storeconfig.SQLDBConfig{
User: "root",
Passwd: "test_pwd",
Address: "localhost:3306",
Database: "block_syncer_backup",
}

// DefaultPieceStoreConfig defines the default configuration of piece store
var DefaultPieceStoreConfig = &storage.PieceStoreConfig{
Shards: 0,
Expand Down Expand Up @@ -148,6 +171,12 @@ var DefaultMetricsConfig = &metrics.MetricsConfig{
HTTPAddress: model.MetricsHTTPAddress,
}

// DefaultMetadataConfig defines the default configuration of Metadata service
var DefaultMetadataConfig = &metadata.MetadataConfig{
IsMasterDB: true,
BsDBSwitchCheckIntervalSec: 3600,
}

type LogConfig struct {
Level string
Path string
Expand Down Expand Up @@ -181,7 +210,7 @@ var DefaultRateLimiterConfig = &localhttp.RateLimiterConfig{
},
}

var DefaultBandwidthLimiterConfig = &localhttp.BandWidthLimiterConfig{
var DefaultBandwidthLimiterConfig = &localhttp.BandwidthLimiterConfig{
Enable: false,
}

Expand Down
23 changes: 23 additions & 0 deletions config/config_template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ Passwd = "test_pwd"
Address = "localhost:3306"
Database = "storage_provider_db"

[BsDBConfig]
User = "root"
Passwd = "root"
Address = "localhost:3306"
Database = "sp_0"

[BsDBSwitchedConfig]
User = "root"
Passwd = "root"
Address = "localhost:3306"
Database = "sp_0"

[PieceStoreConfig]
Shards = 0

Expand Down Expand Up @@ -92,5 +104,16 @@ On = false
RateLimit = 1
RatePeriod = "S"

[BandwidthLimiter]
Enable = false
R = 100
B = 1000

[StopServingCfg]
BucketKeepAliveDays = 7

[MetadataCfg]
IsMasterDB = true
BsDBSwitchCheckIntervalSec = 3600


19 changes: 14 additions & 5 deletions config/subconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ func (cfg *StorageProviderConfig) MakeGatewayConfig() (*gateway.GatewayConfig, e
APILimits: apiLimitsMap,
HTTPLimitCfg: cfg.RateLimiter.HTTPLimitCfg,
}
gCfg.BandWidthLimitCfg = &localhttp.BandWidthLimiterConfig{
Enable: true,
R: 0.1,
B: 1,
gCfg.BandwidthLimitCfg = &localhttp.BandwidthLimiterConfig{
Enable: false,
R: 100,
B: 1000,
}
}
return gCfg, nil
Expand Down Expand Up @@ -227,7 +227,10 @@ func (cfg *StorageProviderConfig) MakeTaskNodeConfig() (*tasknode.TaskNodeConfig
// MakeMetadataServiceConfig make meta data service config from StorageProviderConfig
func (cfg *StorageProviderConfig) MakeMetadataServiceConfig() (*metadata.MetadataConfig, error) {
mCfg := &metadata.MetadataConfig{
SpDBConfig: cfg.SpDBConfig,
BsDBConfig: cfg.BsDBConfig,
BsDBSwitchedConfig: cfg.BsDBSwitchedConfig,
BsDBSwitchCheckIntervalSec: cfg.MetadataCfg.BsDBSwitchCheckIntervalSec,
IsMasterDB: cfg.MetadataCfg.IsMasterDB,
}
if _, ok := cfg.ListenAddress[model.MetadataService]; ok {
mCfg.GRPCAddress = cfg.ListenAddress[model.MetadataService]
Expand All @@ -243,6 +246,12 @@ func (cfg *StorageProviderConfig) MakeManagerServiceConfig() (*manager.ManagerCo
SpOperatorAddress: cfg.SpOperatorAddress,
ChainConfig: cfg.ChainConfig,
SpDBConfig: cfg.SpDBConfig,
PieceStoreConfig: cfg.PieceStoreConfig,
}
if _, ok := cfg.Endpoint[model.MetadataService]; ok {
managerConfig.MetadataGrpcAddress = cfg.Endpoint[model.MetadataService]
} else {
return nil, fmt.Errorf("missing metadata server gRPC address configuration for manager service")
}
return managerConfig, nil
}
Expand Down
8 changes: 8 additions & 0 deletions model/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ const (
BsDBAddress = "BS_DB_ADDRESS"
// BsDBDataBase defines env variable name for block syncer db database
BsDBDataBase = "BS_DB_DATABASE"
// BsDBSwitchedUser defines env variable name for switched block syncer db user name
BsDBSwitchedUser = "BS_DB_SWITCHED_USER"
// BsDBSwitchedPasswd defines env variable name for switched block syncer db user passwd
BsDBSwitchedPasswd = "BS_DB_SWITCHED_PASSWORD"
// BsDBSwitchedAddress defines env variable name for switched block syncer db address
BsDBSwitchedAddress = "BS_DB_SWITCHED_ADDRESS"
// BsDBSwitchedDataBase defines env variable name for switched block syncer db database
BsDBSwitchedDataBase = "BS_DB_SWITCHED_DATABASE"

// SpOperatorAddress defines env variable name for sp operator address
SpOperatorAddress = "greenfield-storage-provider"
Expand Down
32 changes: 32 additions & 0 deletions model/piecestore/piece_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,35 @@ func ComputeSegmentCount(size uint64, spiltSize uint64) uint32 {
}
return segmentCount
}

// GenerateObjectSegmentKeyList generate object's segment piece key list.
func GenerateObjectSegmentKeyList(objectID, objectSize, segmentSize uint64) []string {
var (
segmentCount uint32
segmentIndex uint32
pieceKeyList = make([]string, 0)
)

segmentCount = ComputeSegmentCount(objectSize, segmentSize)
for segmentIndex < segmentCount {
pieceKeyList = append(pieceKeyList, EncodeSegmentPieceKey(objectID, segmentIndex))
segmentIndex++
}
return pieceKeyList
}

// GenerateObjectECKeyList generate object's ec piece key list.
func GenerateObjectECKeyList(objectID, objectSize, segmentSize, redundancyIndex uint64) []string {
var (
segmentCount uint32
segmentIndex uint32
pieceKeyList = make([]string, 0)
)

segmentCount = ComputeSegmentCount(objectSize, segmentSize)
for segmentIndex < segmentCount {
pieceKeyList = append(pieceKeyList, EncodeECPieceKey(objectID, segmentIndex, uint32(redundancyIndex)))
segmentIndex++
}
return pieceKeyList
}
19 changes: 10 additions & 9 deletions pkg/middleware/http/tokenBucket.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
package http

import (
"sync"

"github.com/bnb-chain/greenfield-storage-provider/pkg/log"
"golang.org/x/time/rate"
"sync"
)

type BandWidthLimiterConfig struct {
Enable bool
R rate.Limit
B int
type BandwidthLimiterConfig struct {
Enable bool //Enable Whether to enable bandwidth limiting
R rate.Limit //R The speed at which tokens are generated R per second
B int //B The size of the token bucket
}

type BandWidthLimiter struct {
type BandwidthLimiter struct {
Limiter *rate.Limiter
}

var LimiterOnce sync.Once
var BandWidthLimit *BandWidthLimiter
var BandwidthLimit *BandwidthLimiter

func NewBandWidthLimiter(r rate.Limit, b int) {
func NewBandwidthLimiter(r rate.Limit, b int) {
log.Infof("config r: %v, b:%d", r, b)

LimiterOnce.Do(func() {
BandWidthLimit = &BandWidthLimiter{
BandwidthLimit = &BandwidthLimiter{
Limiter: rate.NewLimiter(r, b),
}
})
Expand Down
8 changes: 4 additions & 4 deletions pkg/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ type Node struct {
stopCh chan struct{}
}

// NewNode return an instance of Node
// NewNode return an instance of Node.
func NewNode(config *NodeConfig, SPAddr string, signer *signerclient.SignerClient) (*Node, error) {
if config.PingPeriod < PingPeriodMin {
config.PingPeriod = PingPeriodMin
}
privKey, localAddr, bootstrapIDs, bootstrapAddrs, err := config.ParseConfing()
privKey, localAddr, bootstrapIDs, bootstrapAddrs, err := config.ParseConfig()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func (n *Node) eventLoop() {
// broadcast sends request to all p2p nodes
func (n *Node) broadcast(pc protocol.ID, data proto.Message) {
for _, peerID := range n.node.Peerstore().PeersWithAddrs() {
log.Debugw("broadcast msg to peer", "peer_id", peerID, "protocol", pc)
// log.Debugw("broadcast msg to peer", "peer_id", peerID, "protocol", pc)
if strings.Compare(n.node.ID().String(), peerID.String()) == 0 {
continue
}
Expand All @@ -222,7 +222,7 @@ func (n *Node) sendToPeer(peerID peer.ID, pc protocol.ID, data proto.Message) er
n.peers.DeletePeer(peerID)
return err
}
log.Debugw("success to send msg", "peer_id", peerID, "protocol", pc, "msg", data.String())
// log.Debugw("succeed to send msg", "peer_id", peerID, "protocol", pc, "msg", data.String())
s.Close()
return err
}
4 changes: 2 additions & 2 deletions pkg/p2p/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (cfg *NodeConfig) overrideConfigFromEnv() {
}
}

// ParseConfing parsers the configuration into a format that go-libp2p can use
func (cfg *NodeConfig) ParseConfing() (privKey crypto.PrivKey, hostAddr ma.Multiaddr, bootstrapIDs []peer.ID, bootstrapAddrs []ma.Multiaddr, err error) {
// ParseConfig parsers the configuration into a format that go-libp2p can use
func (cfg *NodeConfig) ParseConfig() (privKey crypto.PrivKey, hostAddr ma.Multiaddr, bootstrapIDs []peer.ID, bootstrapAddrs []ma.Multiaddr, err error) {
cfg.overrideConfigFromEnv()
if len(cfg.P2PPrivateKey) > 0 {
priKeyBytes, err := hex.DecodeString(cfg.P2PPrivateKey)
Expand Down
12 changes: 6 additions & 6 deletions pkg/p2p/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (n *Node) onPing(s network.Stream) {
log.Warnw("failed to response ping", "peer_id", peerID, "error", err)
return
}
log.Debugw("success to response ping", "peer_id", peerID)
// log.Debugw("succeed to response ping", "peer_id", peerID)
}()
ping := &types.Ping{}
buf, err := io.ReadAll(s)
Expand All @@ -46,7 +46,7 @@ func (n *Node) onPing(s network.Stream) {
log.Errorw("failed to unmarshal ping msg", "error", err)
return
}
log.Debugf("%s received ping request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), ping.String())
// log.Debugf("%s received ping request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), ping.String())

err = types.VerifySignature(ping.GetSpOperatorAddress(), ping.GetSignBytes(), ping.GetSignature())
if err != nil {
Expand All @@ -66,7 +66,7 @@ func (n *Node) onPing(s network.Stream) {
nodeInfo.MultiAddr = append(nodeInfo.MultiAddr, addr.String())
}
pong.Nodes = append(pong.Nodes, nodeInfo)
log.Debugw("send node to remote", "node_id", pID.String(), "remote_node", s.Conn().RemotePeer())
// log.Debugw("send node to remote", "node_id", pID.String(), "remote_node", s.Conn().RemotePeer())
}
pong.SpOperatorAddress = n.SpOperatorAddress
pong, err = n.signer.SignPongMsg(context.Background(), pong)
Expand All @@ -89,7 +89,7 @@ func (n *Node) onPong(s network.Stream) {
log.Warnw("failed to receive pong", "peer_id", peerID, "error", err)
return
}
log.Debugw("success to receive pong", "peer_id", peerID)
// log.Debugw("succeed to receive pong", "peer_id", peerID)
}()
pong := &types.Pong{}
buf, err := io.ReadAll(s)
Expand All @@ -104,7 +104,7 @@ func (n *Node) onPong(s network.Stream) {
log.Errorw("failed to unmarshal ping msg", "error", err)
return
}
log.Debugf("%s received pong request from %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer())
// log.Debugf("%s received pong request from %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer())

err = types.VerifySignature(pong.GetSpOperatorAddress(), pong.GetSignBytes(), pong.GetSignature())
if err != nil {
Expand All @@ -127,6 +127,6 @@ func (n *Node) onPong(s network.Stream) {
addrs = append(addrs, addr)
}
n.node.Peerstore().AddAddrs(pID, addrs, peerstore.PermanentAddrTTL)
log.Debugw("receive node from remote and permanent", "remote_node", s.Conn().RemotePeer(), "node_id", pID)
// log.Debugw("receive node from remote and permanent", "remote_node", s.Conn().RemotePeer(), "node_id", pID)
}
}
Loading

0 comments on commit 56ac4f7

Please sign in to comment.