Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]enable directbroadcast flag to decrease the block propagation time #99

Merged
merged 1 commit into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
utils.KeyStoreDirFlag,
utils.ExternalSignerFlag,
utils.NoUSBFlag,
utils.DirectBroadcastFlag,
utils.SmartCardDaemonPathFlag,
utils.OverrideIstanbulFlag,
utils.OverrideMuirGlacierFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.AncientFlag,
utils.KeyStoreDirFlag,
utils.NoUSBFlag,
utils.DirectBroadcastFlag,
utils.SmartCardDaemonPathFlag,
utils.NetworkIdFlag,
utils.GoerliFlag,
Expand Down
14 changes: 12 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ var (
Usage: "Data directory for the databases and keystore",
Value: DirectoryString(node.DefaultDataDir()),
}
DirectBroadcastFlag = cli.BoolFlag{
Name: "directbroadcast",
Usage: "Enable directly broadcast mined block to all peers",
}
AncientFlag = DirectoryFlag{
Name: "datadir.ancient",
Usage: "Data directory for ancient chain segments (default = inside chaindata)",
Expand Down Expand Up @@ -787,13 +791,13 @@ var (
Value: "",
}

InitNetworkIps= cli.StringFlag{
InitNetworkIps = cli.StringFlag{
Name: "init.ips",
Usage: "the ips of each node in the network, example '192.168.0.1,192.168.0.2'",
Value: "",
}

InitNetworkPort= cli.IntFlag{
InitNetworkPort = cli.IntFlag{
Name: "init.p2p-port",
Usage: "the p2p port of the nodes in the network",
Value: 30311,
Expand Down Expand Up @@ -1243,6 +1247,9 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(NoUSBFlag.Name) {
cfg.NoUSB = ctx.GlobalBool(NoUSBFlag.Name)
}
if ctx.GlobalIsSet(DirectBroadcastFlag.Name) {
cfg.DirectBroadcast = ctx.GlobalBool(DirectBroadcastFlag.Name)
}
if ctx.GlobalIsSet(InsecureUnlockAllowedFlag.Name) {
cfg.InsecureUnlockAllowed = ctx.GlobalBool(InsecureUnlockAllowedFlag.Name)
}
Expand Down Expand Up @@ -1519,6 +1526,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(GCModeFlag.Name) {
cfg.NoPruning = ctx.GlobalString(GCModeFlag.Name) == "archive"
}
if ctx.GlobalIsSet(DirectBroadcastFlag.Name) {
cfg.DirectBroadcast = ctx.GlobalBool(DirectBroadcastFlag.Name)
}
if ctx.GlobalIsSet(CacheNoPrefetchFlag.Name) {
cfg.NoPrefetch = ctx.GlobalBool(CacheNoPrefetchFlag.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if checkpoint == nil {
checkpoint = params.TrustedCheckpoints[genesisHash]
}
if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist, config.DirectBroadcast); err != nil {
return nil, err
}

Expand Down
5 changes: 3 additions & 2 deletions eth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ type Config struct {
// for nodes to connect to.
DiscoveryURLs []string

NoPruning bool // Whether to disable pruning and flush everything to disk
NoPrefetch bool // Whether to disable prefetching and only load state on demand
NoPruning bool // Whether to disable pruning and flush everything to disk
NoPrefetch bool // Whether to disable prefetching and only load state on demand
DirectBroadcast bool

// Whitelist of required block number -> hash values to accept
Whitelist map[uint64]common.Hash `toml:"-"`
Expand Down
33 changes: 20 additions & 13 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ type ProtocolManager struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node

fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
directBroadcast bool

checkpointNumber uint64 // Block number for the sync progress validator to cross reference
checkpointHash common.Hash // Block hash for the sync progress validator to cross reference
Expand Down Expand Up @@ -100,18 +101,19 @@ type ProtocolManager struct {

// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash, directBroadcast bool) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
forkFilter: forkid.NewFilter(blockchain),
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
peers: newPeerSet(),
whitelist: whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
directBroadcast: directBroadcast,
networkID: networkID,
forkFilter: forkid.NewFilter(blockchain),
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
peers: newPeerSet(),
whitelist: whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}

if mode == downloader.FullSync {
Expand Down Expand Up @@ -821,7 +823,12 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
return
}
// Send the block to a subset of our peers
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
var transfer []*peer
if pm.directBroadcast {
transfer = peers[:int(len(peers))]
} else {
transfer = peers[:int(math.Sqrt(float64(len(peers))))]
}
for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td)
}
Expand Down
6 changes: 3 additions & 3 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, ethash.NewFaker(), blockchain, db, 1, nil)
pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, ethash.NewFaker(), blockchain, db, 1, nil, false)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
Expand Down Expand Up @@ -582,7 +582,7 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) {
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, pow, blockchain, db, 1, nil)
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, pow, blockchain, db, 1, nil, false)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
Expand Down Expand Up @@ -643,7 +643,7 @@ func TestBroadcastMalformedBlock(t *testing.T) {
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), engine, blockchain, db, 1, nil)
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), engine, blockchain, db, 1, nil, false)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
}
pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx, pool: make(map[common.Hash]*types.Transaction)}, engine, blockchain, db, 1, nil)
pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx, pool: make(map[common.Hash]*types.Transaction)}, engine, blockchain, db, 1, nil, false)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions eth/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ func TestForkIDSplit(t *testing.T) {
blocksNoFork, _ = core.GenerateChain(configNoFork, genesisNoFork, engine, dbNoFork, 2, nil)
blocksProFork, _ = core.GenerateChain(configProFork, genesisProFork, engine, dbProFork, 2, nil)

ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainNoFork, dbNoFork, 1, nil)
ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainProFork, dbProFork, 1, nil)
ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainNoFork, dbNoFork, 1, nil, false)
ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainProFork, dbProFork, 1, nil, false)
)
ethNoFork.Start(1000)
ethProFork.Start(1000)
Expand Down
3 changes: 3 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type Config struct {
// NoUSB disables hardware wallet monitoring and connectivity.
NoUSB bool `toml:",omitempty"`

// DirectBroadcast enable directly broadcast mined block to all peers
DirectBroadcast bool `toml:",omitempty"`

// SmartCardDaemonPath is the path to the smartcard daemon's socket
SmartCardDaemonPath string `toml:",omitempty"`

Expand Down