Skip to content

Commit

Permalink
feat(p2p/ipld): PutBlock synchrounous DHT providing
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed May 31, 2021
1 parent c20d576 commit 6efdf30
Show file tree
Hide file tree
Showing 16 changed files with 181 additions and 84 deletions.
4 changes: 3 additions & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
abcicli "github.com/lazyledger/lazyledger-core/abci/client"
abci "github.com/lazyledger/lazyledger-core/abci/types"
"github.com/lazyledger/lazyledger-core/evidence"
"github.com/lazyledger/lazyledger-core/ipfs"
"github.com/lazyledger/lazyledger-core/libs/db/memdb"
"github.com/lazyledger/lazyledger-core/libs/log"
"github.com/lazyledger/lazyledger-core/libs/service"
Expand Down Expand Up @@ -78,7 +79,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore,
mempool, mdutils.Mock(), ipfs.MockRouting(), evpool)
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
Expand Down
3 changes: 2 additions & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
abci "github.com/lazyledger/lazyledger-core/abci/types"
cfg "github.com/lazyledger/lazyledger-core/config"
cstypes "github.com/lazyledger/lazyledger-core/consensus/types"
"github.com/lazyledger/lazyledger-core/ipfs"
tmbytes "github.com/lazyledger/lazyledger-core/libs/bytes"
dbm "github.com/lazyledger/lazyledger-core/libs/db"
"github.com/lazyledger/lazyledger-core/libs/db/memdb"
Expand Down Expand Up @@ -399,7 +400,7 @@ func newStateWithConfigAndBlockStore(
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), ipfs.MockRouting(), evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

Expand Down
4 changes: 3 additions & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cstypes "github.com/lazyledger/lazyledger-core/consensus/types"
cryptoenc "github.com/lazyledger/lazyledger-core/crypto/encoding"
"github.com/lazyledger/lazyledger-core/crypto/tmhash"
"github.com/lazyledger/lazyledger-core/ipfs"
"github.com/lazyledger/lazyledger-core/libs/bits"
"github.com/lazyledger/lazyledger-core/libs/bytes"
"github.com/lazyledger/lazyledger-core/libs/db/memdb"
Expand Down Expand Up @@ -183,7 +184,8 @@ func TestReactorWithEvidence(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, mdutils.Mock(), evpool2)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore,
mempool, mdutils.Mock(), ipfs.MockRouting(), evpool2)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

Expand Down
5 changes: 3 additions & 2 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
mdutils "github.com/ipfs/go-merkledag/test"

cfg "github.com/lazyledger/lazyledger-core/config"
"github.com/lazyledger/lazyledger-core/ipfs"
"github.com/lazyledger/lazyledger-core/libs/db/badgerdb"
"github.com/lazyledger/lazyledger-core/libs/log"
tmos "github.com/lazyledger/lazyledger-core/libs/os"
Expand Down Expand Up @@ -131,7 +132,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
pb.cs.Wait()

newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, mdutils.Mock(), pb.cs.evpool)
pb.cs.blockStore, pb.cs.txNotifier, mdutils.Mock(), ipfs.MockRouting(), pb.cs.evpool)
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()

Expand Down Expand Up @@ -331,7 +332,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)

consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, mdutils.Mock(), evpool)
blockStore, mempool, mdutils.Mock(), ipfs.MockRouting(), evpool)

consensusState.SetEventBus(eventBus)
return consensusState
Expand Down
10 changes: 7 additions & 3 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/gogo/protobuf/proto"
format "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p-core/routing"

cfg "github.com/lazyledger/lazyledger-core/config"
cstypes "github.com/lazyledger/lazyledger-core/consensus/types"
"github.com/lazyledger/lazyledger-core/crypto"
Expand Down Expand Up @@ -94,7 +96,8 @@ type State struct {
// store blocks and commits
blockStore sm.BlockStore

dag format.DAGService
dag format.DAGService
croute routing.ContentRouting

// create and execute blocks
blockExec *sm.BlockExecutor
Expand Down Expand Up @@ -164,6 +167,7 @@ func NewState(
blockStore sm.BlockStore,
txNotifier txNotifier,
dag format.DAGService,
croute routing.ContentRouting,
evpool evidencePool,
options ...StateOption,
) *State {
Expand All @@ -172,6 +176,7 @@ func NewState(
blockExec: blockExec,
blockStore: blockStore,
dag: dag,
croute: croute,
txNotifier: txNotifier,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
Expand Down Expand Up @@ -1120,8 +1125,7 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1500)
defer cancel()
cs.Logger.Info("Putting Block to ipfs", "height", block.Height)
// TODO: post data to IPFS in a goroutine
err = ipld.PutBlock(ctx, cs.dag, block)
err = ipld.PutBlock(ctx, cs.dag, block, cs.croute)
if err != nil {
// If PutBlock fails we will be the only node that has the data
// this means something is seriously wrong and we can not recover
Expand Down
20 changes: 10 additions & 10 deletions consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,25 @@ import (
"io"
mrand "math/rand"
"path/filepath"

// "sync"
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/lazyledger/lazyledger-core/abci/example/kvstore"
cfg "github.com/lazyledger/lazyledger-core/config"
"github.com/lazyledger/lazyledger-core/libs/db/memdb"
"github.com/lazyledger/lazyledger-core/privval"
"github.com/lazyledger/lazyledger-core/proxy"
sm "github.com/lazyledger/lazyledger-core/state"
"github.com/lazyledger/lazyledger-core/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lazyledger/lazyledger-core/abci/example/kvstore"
cfg "github.com/lazyledger/lazyledger-core/config"
"github.com/lazyledger/lazyledger-core/consensus/types"
"github.com/lazyledger/lazyledger-core/crypto/merkle"
"github.com/lazyledger/lazyledger-core/ipfs"
"github.com/lazyledger/lazyledger-core/libs/autofile"
"github.com/lazyledger/lazyledger-core/libs/db/memdb"
"github.com/lazyledger/lazyledger-core/libs/log"
"github.com/lazyledger/lazyledger-core/privval"
"github.com/lazyledger/lazyledger-core/proxy"
sm "github.com/lazyledger/lazyledger-core/state"
"github.com/lazyledger/lazyledger-core/store"
tmtypes "github.com/lazyledger/lazyledger-core/types"
tmtime "github.com/lazyledger/lazyledger-core/types/time"
)
Expand Down Expand Up @@ -339,7 +338,8 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
require.NoError(t, err)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, mdutils.Mock(), evpool)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore,
mempool, mdutils.Mock(), ipfs.MockRouting(), evpool)
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/ipfs/go-ipfs-api v0.2.0
github.com/ipfs/go-ipfs-blockstore v0.1.4 // indirect
github.com/ipfs/go-ipfs-config v0.11.0
github.com/ipfs/go-ipfs-routing v0.1.0 // indirect
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.2 // indirect
github.com/ipfs/go-path v0.0.9 // indirect
Expand All @@ -35,6 +36,7 @@ require (
github.com/lazyledger/rsmt2d v0.2.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.12.0 // indirect
github.com/libp2p/go-libp2p-core v0.7.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.11.1 // indirect
github.com/minio/highwayhash v1.0.1
github.com/multiformats/go-multiaddr v0.3.1
Expand Down
3 changes: 1 addition & 2 deletions ipfs/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"sync"

Expand All @@ -26,7 +25,7 @@ import (
// Embedded is the provider that embeds IPFS node within the same process.
// It also returns closable for graceful node shutdown.
func Embedded(init bool, cfg *Config, logger log.Logger) APIProvider {
return func() (coreiface.CoreAPI, io.Closer, error) {
return func() (coreiface.CoreAPI, *core.IpfsNode, error) {
path := cfg.Path()
defer os.Setenv(ipfscfg.EnvDir, path)

Expand Down
28 changes: 25 additions & 3 deletions ipfs/mock.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package ipfs

import (
"io"
"context"

nilrouting "github.com/ipfs/go-ipfs-routing/none"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
coremock "github.com/ipfs/go-ipfs/core/mock"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/libp2p/go-libp2p-core/routing"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"

"github.com/lazyledger/lazyledger-core/ipfs/plugin"
)

// Mock provides simple mock IPFS API useful for testing
func Mock() APIProvider {
return func() (coreiface.CoreAPI, io.Closer, error) {
return func() (coreiface.CoreAPI, *core.IpfsNode, error) {
plugin.EnableNMT()

nd, err := coremock.NewMockNode()
nd, err := MockNode()
if err != nil {
return nil, nil, err
}
Expand All @@ -28,3 +32,21 @@ func Mock() APIProvider {
return api, nd, nil
}
}

func MockNode() (*core.IpfsNode, error) {
ctx := context.TODO()
nd, err := core.NewNode(ctx, &core.BuildCfg{
Online: true,
Host: coremock.MockHostOption(mocknet.New(ctx)),
})
if err != nil {
return nil, err
}
nd.Routing = MockRouting()
return nd, err
}

func MockRouting() routing.Routing {
croute, _ := nilrouting.ConstructNilRouting(context.TODO(), nil, nil, nil)
return croute
}
5 changes: 2 additions & 3 deletions ipfs/provider.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package ipfs

import (
"io"

"github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/interface-go-ipfs-core"
)

// APIProvider allows customizable IPFS core APIs.
type APIProvider func() (coreiface.CoreAPI, io.Closer, error)
type APIProvider func() (coreiface.CoreAPI, *core.IpfsNode, error)
11 changes: 7 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

format "github.com/ipfs/go-ipld-format"
ipface "github.com/ipfs/interface-go-ipfs-core"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
Expand Down Expand Up @@ -397,6 +398,7 @@ func createConsensusReactor(config *cfg.Config,
waitSync bool,
eventBus *types.EventBus,
dag format.DAGService,
croute routing.ContentRouting,
consensusLogger log.Logger) (*cs.Reactor, *cs.State) {

consensusState := cs.NewState(
Expand All @@ -406,6 +408,7 @@ func createConsensusReactor(config *cfg.Config,
blockStore,
mempool,
dag,
croute,
evidencePool,
cs.StateMetrics(csMetrics),
)
Expand Down Expand Up @@ -738,7 +741,7 @@ func NewNode(config *cfg.Config,
sm.BlockExecutorWithMetrics(smMetrics),
)

ipfs, ipfsclose, err := ipfsProvider()
ipfsAPI, ipfsNode, err := ipfsProvider()
if err != nil {
return nil, err
}
Expand All @@ -758,7 +761,7 @@ func NewNode(config *cfg.Config,
}
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || fastSync, eventBus, ipfs.Dag(), consensusLogger,
privValidator, csMetrics, stateSync || fastSync, eventBus, ipfsAPI.Dag(), ipfsNode.Routing, consensusLogger,
)

// Set up state sync reactor, and schedule a sync if requested.
Expand Down Expand Up @@ -859,8 +862,8 @@ func NewNode(config *cfg.Config,
txIndexer: txIndexer,
indexerService: indexerService,
eventBus: eventBus,
ipfsAPI: ipfs,
ipfsClose: ipfsclose,
ipfsAPI: ipfsAPI,
ipfsClose: ipfsNode,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)

Expand Down
Loading

0 comments on commit 6efdf30

Please sign in to comment.