Skip to content

Commit

Permalink
Save and load block data using IPFS (#374)
Browse files Browse the repository at this point in the history
* change the blockstore interface

* update NewBlockstore to accept blockstore.Blockstore

* fix typo causing rpc error

* load block data using the IPFS store

* clean up the rest of the testing utility code

* linter

* handle errors and fix todos
  • Loading branch information
evan-forbes committed Jun 23, 2021
1 parent 9808fc9 commit 8da1644
Show file tree
Hide file tree
Showing 23 changed files with 279 additions and 130 deletions.
13 changes: 10 additions & 3 deletions blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v0

import (
"context"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -178,7 +179,10 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {

block := bcR.store.LoadBlock(msg.Height)
block, err := bcR.store.LoadBlock(context.TODO(), msg.Height)
if err != nil {
panic(err)
}
if block != nil {
bl, err := block.ToProto()
if err != nil {
Expand Down Expand Up @@ -418,11 +422,14 @@ FOR_LOOP:
bcR.pool.PopRequest()

// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
err := bcR.store.SaveBlock(context.TODO(), first, firstParts, second.LastCommit)
if err != nil {
// an error is only returned if something with the local IPFS blockstore is seriously wrong
panic(err)
}

// TODO: same thing for app - but we would need a way to get the hash
// without persisting the state.
var err error
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
Expand Down
20 changes: 14 additions & 6 deletions blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package v0

import (
"context"
"crypto/sha256"
"fmt"
"os"
"sort"
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -69,10 +69,9 @@ func newBlockchainReactor(
panic(fmt.Errorf("error start app: %w", err))
}

blockDB := memdb.NewDB()
stateDB := memdb.NewDB()
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(blockDB, mdutils.Mock())
blockStore := store.MockBlockStore(nil)

state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
if err != nil {
Expand Down Expand Up @@ -100,7 +99,10 @@ func newBlockchainReactor(
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
lastBlock, err := blockStore.LoadBlock(context.TODO(), blockHeight-1)
if err != nil {
panic(err)
}

vote, err := types.MakeVote(
lastBlock.Header.Height,
Expand All @@ -127,7 +129,10 @@ func newBlockchainReactor(
panic(fmt.Errorf("error apply block: %w", err))
}

blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
err := blockStore.SaveBlock(context.TODO(), thisBlock, thisParts, lastCommit)
if err != nil {
panic(err)
}
}

bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
Expand Down Expand Up @@ -184,7 +189,10 @@ func TestNoBlockResponse(t *testing.T) {
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())

for _, tt := range tests {
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
block, err := reactorPairs[1].reactor.store.LoadBlock(context.TODO(), tt.height)
if err != nil {
panic(err)
}
if tt.existent {
assert.True(t, block != nil)
} else {
Expand Down
9 changes: 6 additions & 3 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-merkledag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -57,8 +59,9 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
app.InitChain(abci.RequestInitChain{Validators: vals})

blockDB := memdb.NewDB()
dag := mdutils.Mock()
blockStore := store.NewBlockStore(blockDB, dag)
bs := ipfs.MockBlockStore()
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger())

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
Expand Down
17 changes: 10 additions & 7 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"time"

"github.com/go-kit/kit/log/term"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -356,18 +359,17 @@ func subscribeToVoter(cs *State, addr []byte) <-chan tmpubsub.Message {

func newState(state sm.State, pv types.PrivValidator, app abci.Application, ipfsDagAPI format.DAGService) *State {
config := cfg.ResetTestRoot("consensus_state_test")
return newStateWithConfig(config, state, pv, app, ipfsDagAPI)
return newStateWithConfig(config, state, pv, app)
}

func newStateWithConfig(
thisConfig *cfg.Config,
state sm.State,
pv types.PrivValidator,
app abci.Application,
ipfsDagAPI format.DAGService,
) *State {
blockDB := memdb.NewDB()
return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB, ipfsDagAPI)
return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB)
}

func newStateWithConfigAndBlockStore(
Expand All @@ -376,10 +378,11 @@ func newStateWithConfigAndBlockStore(
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
dag format.DAGService,
) *State {
// Get BlockStore
blockStore := store.NewBlockStore(blockDB, dag)
bs := ipfs.MockBlockStore()
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger())

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
Expand Down Expand Up @@ -708,7 +711,7 @@ func randConsensusNet(
vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals})

css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB, mdutils.Mock())
css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
Expand Down Expand Up @@ -771,7 +774,7 @@ func randConsensusNetWithPeers(
app.InitChain(abci.RequestInitChain{Validators: vals})
// sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above

css[i] = newStateWithConfig(thisConfig, state, privVal, app, mdutils.Mock())
css[i] = newStateWithConfig(thisConfig, state, privVal, app)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
Expand Down
11 changes: 5 additions & 6 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -30,7 +29,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {

config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
Expand All @@ -50,7 +49,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {

config.Consensus.CreateEmptyBlocksInterval = ensureTimeout
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())

assertMempool(cs.txNotifier).EnableTxsAvailable()

Expand All @@ -68,7 +67,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) {

config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())

assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
Expand Down Expand Up @@ -118,7 +117,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
blockDB := memdb.NewDB()
stateStore := sm.NewStore(blockDB)

cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB, mdutils.Mock())
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB)
err := stateStore.Save(state)
require.NoError(t, err)
newBlockHeaderCh := subscribe(cs.eventBus, types.EventQueryNewBlockHeader)
Expand All @@ -144,7 +143,7 @@ func TestMempoolRmBadTx(t *testing.T) {
blockDB := memdb.NewDB()

stateStore := sm.NewStore(blockDB)
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB, mdutils.Mock())
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB)
err := stateStore.Save(state)
require.NoError(t, err)

Expand Down
4 changes: 1 addition & 3 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,8 @@ func TestReactorWithEvidence(t *testing.T) {
// duplicate code from:
// css[i] = newStateWithConfig(thisConfig, state, privVals[i], app)

blockDB := memdb.NewDB()
dag := mdutils.Mock()
blockStore := store.NewBlockStore(blockDB, dag)

blockStore := store.MockBlockStore(nil)
// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
Expand Down
11 changes: 8 additions & 3 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,10 @@ func (h *Handshaker) replayBlocks(
}
for i := firstBlock; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
block, err := h.store.LoadBlock(context.TODO(), i)
if err != nil {
return nil, err
}
// Extra check to ensure the app was not changed in a way it shouldn't have.
if len(appHash) > 0 {
assertAppHashEqualsOneFromBlock(appHash, block)
Expand Down Expand Up @@ -492,15 +495,17 @@ func (h *Handshaker) replayBlocks(

// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
block := h.store.LoadBlock(height)
block, err := h.store.LoadBlock(context.TODO(), height)
if err != nil {
return sm.State{}, err
}
meta := h.store.LoadBlockMeta(height)

// Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{})
blockExec.SetEventBus(h.eventBus)

var err error
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
Expand Down
2 changes: 1 addition & 1 deletion consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
tmos.Exit(err.Error())
}
dag := mdutils.Mock()
blockStore := store.NewBlockStore(blockStoreDB, dag)
blockStore := store.MockBlockStore(blockStoreDB)

// Get State
stateDB, err := badgerdb.NewDB("state", config.DBDir())
Expand Down
30 changes: 19 additions & 11 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
privValidator,
kvstore.NewApplication(),
blockDB,
mdutils.Mock(),
)
cs.SetLogger(logger)

Expand Down Expand Up @@ -174,7 +173,6 @@ LOOP:
privValidator,
kvstore.NewApplication(),
blockDB,
mdutils.Mock(),
)
cs.SetLogger(logger)

Expand Down Expand Up @@ -548,7 +546,9 @@ func TestSimulateValidatorsChange(t *testing.T) {
sim.Chain = make([]*types.Block, 0)
sim.Commits = make([]*types.Commit, 0)
for i := 1; i <= numBlocks; i++ {
sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i)))
blck, err := css[0].blockStore.LoadBlock(context.TODO(), int64(i))
require.NoError(t, err)
sim.Chain = append(sim.Chain, blck)
sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i)))
}
}
Expand Down Expand Up @@ -1195,13 +1195,15 @@ func newMockBlockStore(config *cfg.Config, params tmproto.ConsensusParams) *mock
return &mockBlockStore{config, params, nil, nil, 0, mdutils.Mock()}
}

func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
func (bs *mockBlockStore) Base() int64 { return bs.base }
func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 }
func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) }
func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] }
func (bs *mockBlockStore) LoadBlockByHash(hash []byte) *types.Block {
return bs.chain[int64(len(bs.chain))-1]
func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
func (bs *mockBlockStore) Base() int64 { return bs.base }
func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 }
func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) }
func (bs *mockBlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) {
return bs.chain[height-1], nil
}
func (bs *mockBlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) {
return bs.chain[int64(len(bs.chain))-1], nil
}
func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
block := bs.chain[height-1]
Expand All @@ -1211,7 +1213,13 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
}
}
func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
func (bs *mockBlockStore) SaveBlock(
ctx context.Context,
block *types.Block,
blockParts *types.PartSet,
seenCommit *types.Commit,
) error {
return nil
}
func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
return bs.commits[height-1]
Expand Down
9 changes: 6 additions & 3 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
}

// cancel ctx for previous proposal block to ensure block putting/providing does not queues up
if cs.proposalCancel != nil { //nolint:staticcheck
if cs.proposalCancel != nil {
// FIXME(ismail): below commented out cancel tries to prevent block putting
// and providing no to queue up endlessly.
// But in a real network proposers should have enough time in between.
Expand All @@ -1139,7 +1139,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
// the provide timeout could still be larger than just the time between
// two consecutive proposals.
//
// cs.proposalCancel()
cs.proposalCancel()
}
cs.proposalCtx, cs.proposalCancel = context.WithCancel(context.TODO())
go func(ctx context.Context) {
Expand Down Expand Up @@ -1585,7 +1585,10 @@ func (cs *State) finalizeCommit(height int64) {
// but may differ from the LastCommit included in the next block
precommits := cs.Votes.Precommits(cs.CommitRound)
seenCommit := precommits.MakeCommit()
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
err := cs.blockStore.SaveBlock(context.TODO(), block, blockParts, seenCommit)
if err != nil {
panic(err)
}
} else {
// Happens during replay if we already saved the block but didn't commit
cs.Logger.Info("Calling finalizeCommit on already stored block", "height", block.Height)
Expand Down
2 changes: 1 addition & 1 deletion consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
t.Error(err)
}
dag := mdutils.Mock()
blockStore := store.NewBlockStore(blockStoreDB, dag)
blockStore := store.MockBlockStore(blockStoreDB)

proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app))
proxyApp.SetLogger(logger.With("module", "proxy"))
Expand Down
Loading

0 comments on commit 8da1644

Please sign in to comment.