Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ipfs dag api object in Blockstore #356

Merged
merged 26 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
aabfd03
add the ipfs object to the blockstore and remove if from the state ob…
evan-forbes May 26, 2021
5c63080
fix linter's complaints
evan-forbes May 26, 2021
f0a8a34
increase TestReactorSelectiveBroadcast sleep times
evan-forbes May 26, 2021
e32ce9d
increase ensureTimeout to 4 seconds
evan-forbes May 26, 2021
0c06595
only use the dag api object instead of the entire ipfs api object
evan-forbes May 26, 2021
0517ac2
increase TestNodeSetPrivValIPC timeout to 400ms
evan-forbes May 26, 2021
16a9230
Merge branch 'master' into evan/move-ipfs-to-blockstore
evan-forbes May 26, 2021
ac48a99
increase time waited for TestReactorsGossipNoCommittedEvidence again
evan-forbes May 26, 2021
87fae07
increase TestNodeSetPrivValIPC timeout again
evan-forbes May 26, 2021
eed5e89
timeout increase
evan-forbes May 26, 2021
8d5debe
cleanup remainging mocks
evan-forbes May 26, 2021
6174063
try insane timeout for TestNodeSetPrivValIPC
evan-forbes May 26, 2021
fbfab50
increase the failing precommit timeout
evan-forbes May 26, 2021
2bb033b
more cleanup
evan-forbes May 26, 2021
210e690
remove the unused ipfsAPI from the node
evan-forbes May 27, 2021
e914706
try a test node that doesn't use the full mocked ipfs node
evan-forbes May 27, 2021
cb645cb
implement and use a dag only api provider
evan-forbes May 27, 2021
00d3af0
revert crazy timeout
evan-forbes May 27, 2021
2e5a7e4
simplify dag only mock
evan-forbes May 27, 2021
c11b9fe
remove accidental file
evan-forbes May 27, 2021
e55d925
try to make TestReactorsGossipNoCommittedEvidence less flaky
evan-forbes May 27, 2021
b2469cc
use ipld alias instead of format
evan-forbes May 27, 2021
8a4026c
remove access to the IPFS dag from the blockstore and add it back to …
evan-forbes May 27, 2021
1bddff2
change api provider to only use the dag instead of the core api object
evan-forbes May 27, 2021
71bcef7
change alias to ipld in node package
evan-forbes May 27, 2021
0ce8cec
increase timeouts for TestWALTruncate and timeoutWaitGroup for CI
evan-forbes May 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -71,7 +72,7 @@ func newBlockchainReactor(
blockDB := memdb.NewDB()
stateDB := memdb.NewDB()
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(blockDB)
blockStore := store.NewBlockStore(blockDB, mdutils.Mock())

state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func TestConsensusConfig() *ConsensusConfig {
cfg.TimeoutProposeDelta = 20 * time.Millisecond
cfg.TimeoutPrevote = 80 * time.Millisecond
cfg.TimeoutPrevoteDelta = 20 * time.Millisecond
cfg.TimeoutPrecommit = 80 * time.Millisecond
cfg.TimeoutPrecommit = 160 * time.Millisecond
cfg.TimeoutPrecommitDelta = 20 * time.Millisecond
// NOTE: when modifying, make sure to update time_iota_ms (testGenesisFmt) in toml.go
cfg.TimeoutCommit = 80 * time.Millisecond
Expand Down
4 changes: 2 additions & 2 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
app.InitChain(abci.RequestInitChain{Validators: vals})

blockDB := memdb.NewDB()
blockStore := store.NewBlockStore(blockDB)
blockStore := store.NewBlockStore(blockDB, mdutils.Mock())

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
Expand All @@ -78,7 +78,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
Expand Down
21 changes: 12 additions & 9 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/go-kit/kit/log/term"
format "github.com/ipfs/go-ipld-format"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -52,7 +53,7 @@ type cleanupFunc func()
var (
config *cfg.Config // NOTE: must be reset for each _test.go file
consensusReplayConfig *cfg.Config
ensureTimeout = 2 * time.Second
ensureTimeout = 4 * time.Second
)

func ensureDir(dir string, mode os.FileMode) {
Expand Down Expand Up @@ -352,19 +353,20 @@ func subscribeToVoter(cs *State, addr []byte) <-chan tmpubsub.Message {
//-------------------------------------------------------------------------------
// consensus states

func newState(state sm.State, pv types.PrivValidator, app abci.Application) *State {
func newState(state sm.State, pv types.PrivValidator, app abci.Application, ipfsDagAPI format.DAGService) *State {
config := cfg.ResetTestRoot("consensus_state_test")
return newStateWithConfig(config, state, pv, app)
return newStateWithConfig(config, state, pv, app, ipfsDagAPI)
}

func newStateWithConfig(
thisConfig *cfg.Config,
state sm.State,
pv types.PrivValidator,
app abci.Application,
ipfsDagAPI format.DAGService,
) *State {
blockDB := memdb.NewDB()
return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB)
return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB, ipfsDagAPI)
}

func newStateWithConfigAndBlockStore(
Expand All @@ -373,9 +375,10 @@ func newStateWithConfigAndBlockStore(
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
ipfsDagAPI format.DAGService,
) *State {
// Get BlockStore
blockStore := store.NewBlockStore(blockDB)
blockStore := store.NewBlockStore(blockDB, ipfsDagAPI)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
Expand All @@ -399,7 +402,7 @@ func newStateWithConfigAndBlockStore(
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

Expand Down Expand Up @@ -431,7 +434,7 @@ func randState(nValidators int) (*State, []*validatorStub) {

vss := make([]*validatorStub, nValidators)

cs := newState(state, privVals[0], counter.NewApplication(true))
cs := newState(state, privVals[0], counter.NewApplication(true), mdutils.Mock())

for i := 0; i < nValidators; i++ {
vss[i] = newValidatorStub(privVals[i], int32(i))
Expand Down Expand Up @@ -704,7 +707,7 @@ func randConsensusNet(
vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals})

css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB)
css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB, mdutils.Mock())
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
Expand Down Expand Up @@ -767,7 +770,7 @@ func randConsensusNetWithPeers(
app.InitChain(abci.RequestInitChain{Validators: vals})
// sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above

css[i] = newStateWithConfig(thisConfig, state, privVal, app)
css[i] = newStateWithConfig(thisConfig, state, privVal, app, mdutils.Mock())
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
Expand Down
11 changes: 6 additions & 5 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -29,7 +30,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {

config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
Expand All @@ -49,7 +50,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {

config.Consensus.CreateEmptyBlocksInterval = ensureTimeout
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())

assertMempool(cs.txNotifier).EnableTxsAvailable()

Expand All @@ -67,7 +68,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) {

config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())

assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
Expand Down Expand Up @@ -117,7 +118,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
blockDB := memdb.NewDB()
stateStore := sm.NewStore(blockDB)

cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB)
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB, mdutils.Mock())
err := stateStore.Save(state)
require.NoError(t, err)
newBlockHeaderCh := subscribe(cs.eventBus, types.EventQueryNewBlockHeader)
Expand All @@ -143,7 +144,7 @@ func TestMempoolRmBadTx(t *testing.T) {
blockDB := memdb.NewDB()

stateStore := sm.NewStore(blockDB)
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB)
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB, mdutils.Mock())
err := stateStore.Save(state)
require.NoError(t, err)

Expand Down
6 changes: 3 additions & 3 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestReactorWithEvidence(t *testing.T) {
// css[i] = newStateWithConfig(thisConfig, state, privVals[i], app)

blockDB := memdb.NewDB()
blockStore := store.NewBlockStore(blockDB)
blockStore := store.NewBlockStore(blockDB, mdutils.Mock())

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestReactorWithEvidence(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool2)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

Expand Down Expand Up @@ -670,7 +670,7 @@ func timeoutWaitGroup(t *testing.T, n int, f func(int), css []*State) {

// we're running many nodes in-process, possibly in in a virtual machine,
// and spewing debug messages - making a block could take a while,
timeout := time.Minute * 4
timeout := time.Minute * 6

select {
case <-done:
Expand Down
7 changes: 3 additions & 4 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strings"

mdutils "github.com/ipfs/go-merkledag/test"

cfg "github.com/lazyledger/lazyledger-core/config"
"github.com/lazyledger/lazyledger-core/libs/db/badgerdb"
"github.com/lazyledger/lazyledger-core/libs/log"
Expand Down Expand Up @@ -131,7 +130,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
pb.cs.Wait()

newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, mdutils.Mock(), pb.cs.evpool)
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()

Expand Down Expand Up @@ -290,7 +289,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
if err != nil {
tmos.Exit(err.Error())
}
blockStore := store.NewBlockStore(blockStoreDB)
blockStore := store.NewBlockStore(blockStoreDB, mdutils.Mock())

// Get State
stateDB, err := badgerdb.NewDB("state", config.DBDir())
Expand Down Expand Up @@ -331,7 +330,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)

consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, mdutils.Mock(), evpool)
blockStore, mempool, evpool)

consensusState.SetEventBus(eventBus)
return consensusState
Expand Down
24 changes: 15 additions & 9 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/gogo/protobuf/proto"
format "github.com/ipfs/go-ipld-format"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -78,6 +79,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
privValidator,
kvstore.NewApplication(),
blockDB,
mdutils.Mock(),
)
cs.SetLogger(logger)

Expand Down Expand Up @@ -130,9 +132,7 @@ func TestWALCrash(t *testing.T) {
heightToStop int64
}{
{"empty block",
func(stateDB dbm.DB, cs *State, ctx context.Context) {
cs.dag = mdutils.Mock()
},
func(stateDB dbm.DB, cs *State, ctx context.Context) {},
1},
{"many non-empty blocks",
func(stateDB dbm.DB, cs *State, ctx context.Context) {
Expand Down Expand Up @@ -174,6 +174,7 @@ LOOP:
privValidator,
kvstore.NewApplication(),
blockDB,
mdutils.Mock(),
)
cs.SetLogger(logger)

Expand Down Expand Up @@ -1181,16 +1182,17 @@ func stateAndStore(
// mock block store

type mockBlockStore struct {
config *cfg.Config
params tmproto.ConsensusParams
chain []*types.Block
commits []*types.Commit
base int64
config *cfg.Config
params tmproto.ConsensusParams
chain []*types.Block
commits []*types.Commit
base int64
ipfsDagAPI format.DAGService
}

// TODO: NewBlockStore(db.NewMemDB) ...
func newMockBlockStore(config *cfg.Config, params tmproto.ConsensusParams) *mockBlockStore {
return &mockBlockStore{config, params, nil, nil, 0}
return &mockBlockStore{config, params, nil, nil, 0, mdutils.Mock()}
}

func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
Expand Down Expand Up @@ -1229,6 +1231,10 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) {
return pruned, nil
}

func (bs *mockBlockStore) IpfsDagAPI() format.DAGService {
return bs.ipfsDagAPI
}

//---------------------------------------
// Test handshake/init chain

Expand Down
7 changes: 1 addition & 6 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/gogo/protobuf/proto"
format "github.com/ipfs/go-ipld-format"
cfg "github.com/lazyledger/lazyledger-core/config"
cstypes "github.com/lazyledger/lazyledger-core/consensus/types"
"github.com/lazyledger/lazyledger-core/crypto"
Expand Down Expand Up @@ -94,8 +93,6 @@ type State struct {
// store blocks and commits
blockStore sm.BlockStore

dag format.DAGService
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls, keep DAGService in the State how it was. DAGService handles networking as well and making State rely on dag service provided by blockstore is architecturally not a good idea.

Copy link
Member

@Wondertan Wondertan May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, I argue that we should change "move the ipfs dag api object to Blockstore" to "use the ipfs dag api object in Blockstore"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is now true, yes.


// create and execute blocks
blockExec *sm.BlockExecutor

Expand Down Expand Up @@ -163,15 +160,13 @@ func NewState(
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
dag format.DAGService,
evpool evidencePool,
options ...StateOption,
) *State {
cs := &State{
config: config,
blockExec: blockExec,
blockStore: blockStore,
dag: dag,
txNotifier: txNotifier,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
Expand Down Expand Up @@ -1121,7 +1116,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
defer cancel()
cs.Logger.Info("Putting Block to ipfs", "height", block.Height)
// TODO: post data to IPFS in a goroutine
err = ipld.PutBlock(ctx, cs.dag, block)
err = ipld.PutBlock(ctx, cs.blockStore.IpfsDagAPI(), block)
if err != nil {
// If PutBlock fails we will be the only node that has the data
// this means something is seriously wrong and we can not recover
Expand Down
7 changes: 4 additions & 3 deletions consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -639,7 +640,7 @@ func TestStateLockPOLRelock(t *testing.T) {
signAddVotes(cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4)

// before we timeout to the new round set the new proposal
cs2 := newState(cs1.state, vs2, counter.NewApplication(true))
cs2 := newState(cs1.state, vs2, counter.NewApplication(true), mdutils.Mock())
prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1)
if prop == nil || propBlock == nil {
t.Fatal("Failed to create proposal block with vs2")
Expand Down Expand Up @@ -825,7 +826,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
signAddVotes(cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4)

// before we timeout to the new round set the new proposal
cs2 := newState(cs1.state, vs2, counter.NewApplication(true))
cs2 := newState(cs1.state, vs2, counter.NewApplication(true), mdutils.Mock())
prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1)
if prop == nil || propBlock == nil {
t.Fatal("Failed to create proposal block with vs2")
Expand Down Expand Up @@ -869,7 +870,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
signAddVotes(cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4)

// before we timeout to the new round set the new proposal
cs3 := newState(cs1.state, vs3, counter.NewApplication(true))
cs3 := newState(cs1.state, vs3, counter.NewApplication(true), mdutils.Mock())
prop, propBlock = decideProposal(cs3, vs3, vs3.Height, vs3.Round+1)
if prop == nil || propBlock == nil {
t.Fatal("Failed to create proposal block with vs2")
Expand Down
Loading