Skip to content

Commit

Permalink
Merge pull request hyperledger#5 from shivdeep-singh-ibm/temp_draft_b…
Browse files Browse the repository at this point in the history
…ft_blockpuller

add blockfetcher to consensus
  • Loading branch information
yacovm authored Jun 28, 2022
2 parents 53b511f + e5b553c commit 54c4c8d
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 0 deletions.
33 changes: 33 additions & 0 deletions orderer/common/cluster/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (

type BlockSource interface {
PullBlock(seq uint64) *common.Block
HeightsByEndpoints() (map[string]uint64, error)
UpdateEndpoints(endpoints []EndpointCriteria)
Close()
}
Expand Down Expand Up @@ -525,3 +526,35 @@ func (bf *BlockFetcher) PullBlock(seq uint64) *common.Block {
retriesLeft--
}
}

// HeightsByEndpoints returns the block heights by endpoints of orderers
func (bf BlockFetcher) HeightsByEndpoints() (map[string]uint64, error) {
return bf.BlockSourceFactory(bf.Config).HeightsByEndpoints()
}

// UpdateEndpoints assigns the new endpoints.
func (p *BlockFetcher) UpdateEndpoints(endpoints []EndpointCriteria) {
p.Logger.Debugf("Updating endpoints: %v", endpoints)
p.Config.Endpoints = endpoints
}

// Close closes the blocksource of blockfetcher.
func (bf BlockFetcher) Close() {
if bf.currentBlockSource != nil {
bf.currentBlockSource.Close()
}
}

// Clone returns a copy of this BlockFetcher initialized
// for the given channel
func (p *BlockFetcher) Clone() *BlockFetcher {
// Clone by value
copy := *p

// Reset internal state
copy.currentEndpoint = EndpointCriteria{}
copy.currentSeq = 0
copy.currentBlockSource = nil
copy.suspects = suspectSet{}
return &copy
}
22 changes: 22 additions & 0 deletions orderer/common/cluster/mocks/block_source.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions orderer/common/follower/block_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package follower

import (
"encoding/pem"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/bccsp"
Expand All @@ -18,6 +19,19 @@ import (
"github.com/pkg/errors"
)

// TODO: keep these constants at one place tp
// to prevent code duplication. These would be needed in the
// consensus blockpuller.
const (
// compute endpoint shuffle timeout from FetchTimeout
// endpoint shuffle timeout should be shuffleTimeoutMultiplier times FetchTimeout
shuffleTimeoutMultiplier = 10
// timeout expressed as a percentage of FetchtimeOut
// if PullBlock returns before (shuffleTimeoutPercentage% of FetchTimeOut of blockfetcher)
// the source is shuffled.
shuffleTimeoutPercentage = int64(50)
)

//go:generate counterfeiter -o mocks/channel_puller.go -fake-name ChannelPuller . ChannelPuller

// ChannelPuller pulls blocks for a channel
Expand Down Expand Up @@ -112,6 +126,66 @@ func (creator *BlockPullerCreator) BlockPuller(configBlock *common.Block, stopCh
return bp, nil
}

// BlockFetcher creates a block fetcher on demand, taking the endpoints from the config block.
func (creator *BlockPullerCreator) BlockFetcher(configBlock *common.Block, stopChannel chan struct{}) (ChannelPuller, error) {
// Extract the TLS CA certs and endpoints from the join-block
endpoints, err := cluster.EndpointconfigFromConfigBlock(configBlock, creator.bccsp)
if err != nil {
return nil, errors.WithMessage(err, "error extracting endpoints from config block")
}

fc := cluster.FetcherConfig{
Channel: creator.channelID,
Signer: creator.signer,
TLSCert: creator.der.Bytes,
Dialer: creator.stdDialer,
Endpoints: endpoints,
FetchTimeout: creator.clusterConfig.ReplicationPullTimeout,
}

maxByzantineNodes := uint64(len(endpoints)-1) / 3

// timeout for time based shuffling
shuffleTimeout := shuffleTimeoutMultiplier * creator.clusterConfig.ReplicationPullTimeout
shuffleTimeoutThrehold := shuffleTimeoutPercentage

bf := &cluster.BlockFetcher{
MaxPullBlockRetries: 0,
AttestationSourceFactory: func(c cluster.FetcherConfig) cluster.AttestationSource {
return &cluster.AttestationPuller{
Config: fc,
Logger: flogging.MustGetLogger("orderer.common.cluster.attestationpuller").With("channel", creator.channelID),
}
},
BlockSourceFactory: func(c cluster.FetcherConfig) cluster.BlockSource {
// fill blockpuller fields from config
return &cluster.BlockPuller{
VerifyBlockSequence: creator.VerifyBlockSequence,
Logger: flogging.MustGetLogger("orderer.common.cluster.puller").With("channel", creator.channelID),
RetryTimeout: creator.clusterConfig.ReplicationRetryTimeout,
MaxTotalBufferBytes: creator.clusterConfig.ReplicationBufferSize,
MaxPullBlockRetries: uint64(creator.clusterConfig.ReplicationMaxRetries),
FetchTimeout: creator.clusterConfig.ReplicationPullTimeout,
Endpoints: c.Endpoints,
Signer: c.Signer,
TLSCert: creator.der.Bytes,
Channel: c.Channel,
Dialer: c.Dialer,
StopChannel: stopChannel,
}
},
Config: fc,
Logger: flogging.MustGetLogger("orderer.common.cluster.puller").With("channel", creator.channelID),
ShuffleTimeout: shuffleTimeout,
LastShuffledAt: time.Now(),
MaxByzantineNodes: maxByzantineNodes,
ShuffleTimeoutThrehold: shuffleTimeoutThrehold,
TimeNow: time.Now,
}

return bf, nil
}

// UpdateVerifierFromConfigBlock creates a new block signature verifier from the config block and updates the internal
// link to said verifier.
func (creator *BlockPullerCreator) UpdateVerifierFromConfigBlock(configBlock *common.Block) error {
Expand Down
117 changes: 117 additions & 0 deletions orderer/consensus/etcdraft/blockpuller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package etcdraft

import (
"encoding/pem"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
Expand All @@ -18,6 +20,16 @@ import (
"github.com/pkg/errors"
)

const (
// compute endpoint shuffle timeout from FetchTimeout
// endpoint shuffle timeout should be shuffleTimeoutMultiplier times FetchTimeout
shuffleTimeoutMultiplier = 10
// timeout expressed as a percentage of FetchtimeOut
// if PullBlock returns before (shuffleTimeoutPercentage% of FetchTimeOut of blockfetcher)
// the source is shuffled.
shuffleTimeoutPercentage = int64(50)
)

// LedgerBlockPuller pulls blocks upon demand, or fetches them from the ledger
type LedgerBlockPuller struct {
BlockPuller
Expand Down Expand Up @@ -107,3 +119,108 @@ func NewBlockPuller(support consensus.ConsenterSupport,
BlockPuller: bp,
}, nil
}

// NewBlockFetcher creates a new block fetcher
func NewBlockFetcher(support consensus.ConsenterSupport,
baseDialer *cluster.PredicateDialer,
clusterConfig localconfig.Cluster,
bccsp bccsp.BCCSP,
) (BlockPuller, error) {
verifyBlockSequence := func(blocks []*common.Block, _ string) error {
return cluster.VerifyBlocks(blocks, support)
}

stdDialer := &cluster.StandardDialer{
Config: baseDialer.Config,
}
stdDialer.Config.AsyncConnect = false
stdDialer.Config.SecOpts.VerifyCertificate = nil

// Extract the TLS CA certs and endpoints from the configuration,
endpoints, err := EndpointconfigFromSupport(support, bccsp)
if err != nil {
return nil, err
}

der, _ := pem.Decode(stdDialer.Config.SecOpts.Certificate)
if der == nil {
return nil, errors.Errorf("client certificate isn't in PEM format: %v",
string(stdDialer.Config.SecOpts.Certificate))
}

fc := cluster.FetcherConfig{
Channel: support.ChannelID(),
Signer: support,
TLSCert: der.Bytes,
Dialer: stdDialer,
Endpoints: endpoints,
FetchTimeout: clusterConfig.ReplicationPullTimeout,
}

// To tolerate byzantine behaviour of `f` faulty nodes, we need a total of `3f + 1` nodes.
// f = maxByzantineNodes, total = len(endpoints)
maxByzantineNodes := uint64(len(endpoints)-1) / 3

// timeout for time based shuffling
// 10 times the fetchtimeout
shuffleTimeout := shuffleTimeoutMultiplier * clusterConfig.ReplicationPullTimeout
shuffleTimeoutThrehold := shuffleTimeoutPercentage

bf := cluster.BlockFetcher{
MaxPullBlockRetries: 0,
AttestationSourceFactory: func(c cluster.FetcherConfig) cluster.AttestationSource {
return &cluster.AttestationPuller{
Config: fc,
Logger: flogging.MustGetLogger("orderer.common.cluster.attestationpuller").With("channel", support.ChannelID()),
}
},
BlockSourceFactory: func(c cluster.FetcherConfig) cluster.BlockSource {
return &cluster.BlockPuller{VerifyBlockSequence: verifyBlockSequence,
Logger: flogging.MustGetLogger("orderer.common.cluster.puller").With("channel", support.ChannelID()),
RetryTimeout: clusterConfig.ReplicationRetryTimeout,
MaxTotalBufferBytes: clusterConfig.ReplicationBufferSize,
FetchTimeout: clusterConfig.ReplicationPullTimeout,
Endpoints: c.Endpoints,
Signer: c.Signer,
TLSCert: der.Bytes,
Channel: c.Channel,
Dialer: stdDialer,
StopChannel: make(chan struct{}),
}
},
Config: fc,
Logger: flogging.MustGetLogger("orderer.common.cluster.puller").With("channel", support.ChannelID()),
ShuffleTimeout: shuffleTimeout,
LastShuffledAt: time.Now(),
MaxByzantineNodes: maxByzantineNodes,
ConfirmByzantineBehavior: func(attestations []*orderer.BlockAttestation) bool {
if attestations == nil {
return true
}
isAttestationForged := func(a *orderer.BlockAttestation) bool {
err := verifyBlockSequence([]*common.Block{{Header: a.Header, Metadata: a.Metadata}}, support.ChannelID())
return err != nil
}
// the source is suspected for malicious behaviour if the attestations list has atleast one valid attestation
// check for the validity of each attestation, so that a forged attestation may not mislead us to suspect the source,
// i.e. funtcion should not return a false positive.
for _, a := range attestations {
if a != nil {
if isAttestationForged(a) {
continue
}
return true
}
}
return false
},
ShuffleTimeoutThrehold: shuffleTimeoutThrehold,
TimeNow: time.Now,
}

return &LedgerBlockPuller{
Height: support.Height,
BlockRetriever: support,
BlockPuller: &bf,
}, nil
}

0 comments on commit 54c4c8d

Please sign in to comment.