diff --git a/cmd/tendermint/commands/light.go b/cmd/tendermint/commands/light.go index a1501e2b10..61f654ac09 100644 --- a/cmd/tendermint/commands/light.go +++ b/cmd/tendermint/commands/light.go @@ -184,6 +184,7 @@ func runProxy(cmd *cobra.Command, args []string) error { case daSampling: cfg := ipfs.DefaultConfig() cfg.RootDir = dir + cfg.ServeAPI = true // TODO(ismail): share badger instance apiProvider := ipfs.Embedded(true, cfg, logger) var dag coreiface.APIDagService diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 6876dbebba..0436b8bfef 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -16,6 +16,7 @@ import ( abcicli "github.com/lazyledger/lazyledger-core/abci/client" abci "github.com/lazyledger/lazyledger-core/abci/types" "github.com/lazyledger/lazyledger-core/evidence" + "github.com/lazyledger/lazyledger-core/ipfs" "github.com/lazyledger/lazyledger-core/libs/db/memdb" "github.com/lazyledger/lazyledger-core/libs/log" "github.com/lazyledger/lazyledger-core/libs/service" @@ -79,7 +80,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, dag, evpool) + cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, + mempool, dag, ipfs.MockRouting(), evpool) cs.SetLogger(cs.Logger) // set private validator pv := privVals[i] diff --git a/consensus/common_test.go b/consensus/common_test.go index 6120de4e85..e3372552fd 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -24,6 +24,7 @@ import ( abci "github.com/lazyledger/lazyledger-core/abci/types" cfg "github.com/lazyledger/lazyledger-core/config" cstypes "github.com/lazyledger/lazyledger-core/consensus/types" + "github.com/lazyledger/lazyledger-core/ipfs" tmbytes "github.com/lazyledger/lazyledger-core/libs/bytes" dbm "github.com/lazyledger/lazyledger-core/libs/db" "github.com/lazyledger/lazyledger-core/libs/db/memdb" @@ -402,7 +403,7 @@ func newStateWithConfigAndBlockStore( } blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, dag, evpool) + cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, dag, ipfs.MockRouting(), evpool) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 25b090835a..2ccffd67b8 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -23,6 +23,7 @@ import ( cstypes "github.com/lazyledger/lazyledger-core/consensus/types" cryptoenc "github.com/lazyledger/lazyledger-core/crypto/encoding" "github.com/lazyledger/lazyledger-core/crypto/tmhash" + "github.com/lazyledger/lazyledger-core/ipfs" "github.com/lazyledger/lazyledger-core/libs/bits" "github.com/lazyledger/lazyledger-core/libs/bytes" "github.com/lazyledger/lazyledger-core/libs/db/memdb" @@ -184,7 +185,8 @@ func TestReactorWithEvidence(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, dag, evpool2) + cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, + mempool, dag, ipfs.MockRouting(), evpool2) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 6e2bb44ee1..e70f611170 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -12,6 +12,7 @@ import ( mdutils "github.com/ipfs/go-merkledag/test" cfg "github.com/lazyledger/lazyledger-core/config" + "github.com/lazyledger/lazyledger-core/ipfs" "github.com/lazyledger/lazyledger-core/libs/db/badgerdb" "github.com/lazyledger/lazyledger-core/libs/log" tmos "github.com/lazyledger/lazyledger-core/libs/os" @@ -130,7 +131,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error pb.cs.Wait() newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, - pb.cs.blockStore, pb.cs.txNotifier, mdutils.Mock(), pb.cs.evpool) + pb.cs.blockStore, pb.cs.txNotifier, mdutils.Mock(), ipfs.MockRouting(), pb.cs.evpool) newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -331,8 +332,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) consensusState := NewState(csConfig, state.Copy(), blockExec, - blockStore, mempool, dag, evpool) - + blockStore, mempool, dag, ipfs.MockRouting(), evpool) consensusState.SetEventBus(eventBus) return consensusState } diff --git a/consensus/state.go b/consensus/state.go index e83d5cb192..b42626b3b0 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -13,6 +13,8 @@ import ( "github.com/gogo/protobuf/proto" format "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-libp2p-core/routing" + cfg "github.com/lazyledger/lazyledger-core/config" cstypes "github.com/lazyledger/lazyledger-core/consensus/types" "github.com/lazyledger/lazyledger-core/crypto" @@ -94,7 +96,8 @@ type State struct { // store blocks and commits blockStore sm.BlockStore - dag format.DAGService + dag format.DAGService + croute routing.ContentRouting // create and execute blocks blockExec *sm.BlockExecutor @@ -151,6 +154,10 @@ type State struct { // for reporting metrics metrics *Metrics + + // context of the recent proposed block + proposalCtx context.Context + proposalCancel context.CancelFunc } // StateOption sets an optional parameter on the State. @@ -164,6 +171,7 @@ func NewState( blockStore sm.BlockStore, txNotifier txNotifier, dag format.DAGService, + croute routing.ContentRouting, evpool evidencePool, options ...StateOption, ) *State { @@ -172,6 +180,7 @@ func NewState( blockExec: blockExec, blockStore: blockStore, dag: dag, + croute: croute, txNotifier: txNotifier, peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), @@ -1112,23 +1121,24 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { cs.Logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err) } - // TODO(ismail): capture this in the Consensus ADR - // post data to ipfs - // TODO(evan): don't hard code context and timeout - // - // longer timeouts result in block proposers failing to propose blocks in time. - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1500) - defer cancel() - cs.Logger.Info("Putting Block to ipfs", "height", block.Height) - // TODO: post data to IPFS in a goroutine - err = ipld.PutBlock(ctx, cs.dag, block) - if err != nil { - // If PutBlock fails we will be the only node that has the data - // this means something is seriously wrong and we can not recover - // from that automatically. - panic(fmt.Sprintf("failure to post block data to IPFS: %s", err.Error())) + // cancel ctx for previous proposal block to ensure block putting/providing does not queues up + if cs.proposalCancel != nil { + cs.proposalCancel() } - cs.Logger.Info("Finished putting block to ipfs", "height", block.Height) + cs.proposalCtx, cs.proposalCancel = context.WithCancel(context.TODO()) + go func(ctx context.Context) { + cs.Logger.Info("Putting Block to IPFS", "height", block.Height) + err = ipld.PutBlock(ctx, cs.dag, block, cs.croute, cs.Logger) + if err != nil { + if errors.Is(err, context.Canceled) { + cs.Logger.Error("Putting Block didn't finish in time and was terminated", "height", block.Height) + return + } + cs.Logger.Error("Failed to put Block to IPFS", "err", err, "height", block.Height) + return + } + cs.Logger.Info("Finished putting block to IPFS", "height", block.Height) + }(cs.proposalCtx) } // Returns true if the proposal block is complete && diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 3dc0170219..8cc83bd054 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -8,26 +8,25 @@ import ( "io" mrand "math/rand" "path/filepath" - - // "sync" "testing" "time" mdutils "github.com/ipfs/go-merkledag/test" - "github.com/lazyledger/lazyledger-core/abci/example/kvstore" - cfg "github.com/lazyledger/lazyledger-core/config" - "github.com/lazyledger/lazyledger-core/libs/db/memdb" - "github.com/lazyledger/lazyledger-core/privval" - "github.com/lazyledger/lazyledger-core/proxy" - sm "github.com/lazyledger/lazyledger-core/state" - "github.com/lazyledger/lazyledger-core/store" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/lazyledger/lazyledger-core/abci/example/kvstore" + cfg "github.com/lazyledger/lazyledger-core/config" "github.com/lazyledger/lazyledger-core/consensus/types" "github.com/lazyledger/lazyledger-core/crypto/merkle" + "github.com/lazyledger/lazyledger-core/ipfs" "github.com/lazyledger/lazyledger-core/libs/autofile" + "github.com/lazyledger/lazyledger-core/libs/db/memdb" "github.com/lazyledger/lazyledger-core/libs/log" + "github.com/lazyledger/lazyledger-core/privval" + "github.com/lazyledger/lazyledger-core/proxy" + sm "github.com/lazyledger/lazyledger-core/state" + "github.com/lazyledger/lazyledger-core/store" tmtypes "github.com/lazyledger/lazyledger-core/types" tmtime "github.com/lazyledger/lazyledger-core/types/time" ) @@ -339,7 +338,8 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { evpool := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) require.NoError(t, err) - consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, dag, evpool) + consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, + mempool, dag, ipfs.MockRouting(), evpool) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { diff --git a/go.mod b/go.mod index 28cafdfbcb..74d1386e22 100644 --- a/go.mod +++ b/go.mod @@ -17,11 +17,16 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/gtank/merlin v0.1.1 github.com/hdevalence/ed25519consensus v0.0.0-20201207055737-7fde80a9d5ff + github.com/ipfs/go-bitswap v0.3.3 // indirect github.com/ipfs/go-block-format v0.0.2 + github.com/ipfs/go-blockservice v0.1.4 // indirect github.com/ipfs/go-cid v0.0.7 + github.com/ipfs/go-datastore v0.4.5 // indirect github.com/ipfs/go-ipfs v0.8.0 github.com/ipfs/go-ipfs-api v0.2.0 + github.com/ipfs/go-ipfs-blockstore v0.1.4 // indirect github.com/ipfs/go-ipfs-config v0.11.0 + github.com/ipfs/go-ipfs-routing v0.1.0 // indirect github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-merkledag v0.3.2 github.com/ipfs/go-path v0.0.9 // indirect @@ -30,6 +35,10 @@ require ( github.com/lazyledger/nmt v0.5.0 github.com/lazyledger/rsmt2d v0.2.0 github.com/libp2p/go-buffer-pool v0.0.2 + github.com/libp2p/go-libp2p v0.12.0 // indirect + github.com/libp2p/go-libp2p-core v0.7.0 // indirect + github.com/libp2p/go-libp2p-kad-dht v0.11.1 // indirect + github.com/libp2p/go-libp2p-kbucket v0.4.7 // indirect github.com/minio/highwayhash v1.0.1 github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multihash v0.0.14 diff --git a/ipfs/defaults.go b/ipfs/defaults.go index 7e3cd53555..a442b0bf53 100644 --- a/ipfs/defaults.go +++ b/ipfs/defaults.go @@ -125,10 +125,16 @@ func DefaultFullNodeConfig() (*ipfscfg.Config, error) { // In kDHT all records have TTL, thus we have to regularly(Interval) reprovide/reannounce stored CID to the // network. Otherwise information that the node stores something will be lost. Should be in tact with kDHT // record cleaning configuration + // TODO(Wondertan) In case StrategicProviding is true, we have to implement reproviding manually. Reprovider: ipfscfg.Reprovider{ Interval: "12h", Strategy: "all", }, + // List of all experimental IPFS features + Experimental: ipfscfg.Experiments{ + // Disables BitSwap providing and reproviding in favour of manual providing. + StrategicProviding: true, + }, Routing: ipfscfg.Routing{ // Full node must be available from the WAN thus 'dhtserver' // Depending on the node type/use-case different modes should be used. @@ -152,9 +158,6 @@ func DefaultFullNodeConfig() (*ipfscfg.Config, error) { // Unused fields: // we currently don't use PubSub Pubsub: ipfscfg.PubsubConfig{}, - // List of all experimental IPFS features - // We currently don't use any of them - Experimental: ipfscfg.Experiments{}, // bi-directional agreement between peers to hold connections with Peering: ipfscfg.Peering{}, // local network discovery is useful, but there is no practical reason to have two FullNode in one LAN diff --git a/ipfs/embedded.go b/ipfs/embedded.go index d318f3200f..febe66d80d 100644 --- a/ipfs/embedded.go +++ b/ipfs/embedded.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "os" "sync" @@ -26,7 +25,7 @@ import ( // Embedded is the provider that embeds IPFS node within the same process. // It also returns closable for graceful node shutdown. func Embedded(init bool, cfg *Config, logger log.Logger) APIProvider { - return func() (coreiface.APIDagService, io.Closer, error) { + return func() (coreiface.APIDagService, *core.IpfsNode, error) { path := cfg.Path() defer os.Setenv(ipfscfg.EnvDir, path) diff --git a/ipfs/mock.go b/ipfs/mock.go index e7e7282c62..62843d78af 100644 --- a/ipfs/mock.go +++ b/ipfs/mock.go @@ -1,26 +1,54 @@ package ipfs import ( - "io" + "context" + nilrouting "github.com/ipfs/go-ipfs-routing/none" + "github.com/ipfs/go-ipfs/core" + coremock "github.com/ipfs/go-ipfs/core/mock" ipld "github.com/ipfs/go-ipld-format" - mdutils "github.com/ipfs/go-merkledag/test" coreiface "github.com/ipfs/interface-go-ipfs-core" + "github.com/libp2p/go-libp2p-core/routing" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + + "github.com/lazyledger/lazyledger-core/ipfs/plugin" ) // Mock provides simple mock IPFS API useful for testing func Mock() APIProvider { - return func() (coreiface.APIDagService, io.Closer, error) { - dom := dagOnlyMock{mdutils.Mock()} + return func() (coreiface.APIDagService, *core.IpfsNode, error) { + plugin.EnableNMT() + + nd, err := MockNode() + if err != nil { + return nil, nil, err + } - return dom, dom, nil + return dagOnlyMock{nd.DAG}, nd, nil } } +func MockNode() (*core.IpfsNode, error) { + ctx := context.TODO() + nd, err := core.NewNode(ctx, &core.BuildCfg{ + Online: true, + Host: coremock.MockHostOption(mocknet.New(ctx)), + }) + if err != nil { + return nil, err + } + nd.Routing = MockRouting() + return nd, err +} + +func MockRouting() routing.Routing { + croute, _ := nilrouting.ConstructNilRouting(context.TODO(), nil, nil, nil) + return croute +} + type dagOnlyMock struct { ipld.DAGService } func (dom dagOnlyMock) Dag() coreiface.APIDagService { return dom } -func (dagOnlyMock) Close() error { return nil } func (dom dagOnlyMock) Pinning() ipld.NodeAdder { return dom } diff --git a/ipfs/provider.go b/ipfs/provider.go index d7575526ae..cdda230262 100644 --- a/ipfs/provider.go +++ b/ipfs/provider.go @@ -1,10 +1,9 @@ package ipfs import ( - "io" - + "github.com/ipfs/go-ipfs/core" coreiface "github.com/ipfs/interface-go-ipfs-core" ) // APIProvider allows customizable IPFS core APIs. -type APIProvider func() (coreiface.APIDagService, io.Closer, error) +type APIProvider func() (coreiface.APIDagService, *core.IpfsNode, error) diff --git a/node/node.go b/node/node.go index 00ed08bf98..0ed5fee9be 100644 --- a/node/node.go +++ b/node/node.go @@ -13,7 +13,7 @@ import ( "time" ipld "github.com/ipfs/go-ipld-format" - iface "github.com/ipfs/interface-go-ipfs-core" + "github.com/libp2p/go-libp2p-core/routing" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" @@ -219,26 +219,6 @@ type Node struct { ipfsClose io.Closer } -func initDBs( - config *cfg.Config, - dbProvider DBProvider, - dag iface.APIDagService, -) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { - var blockStoreDB dbm.DB - blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) - if err != nil { - return - } - blockStore = store.NewBlockStore(blockStoreDB, dag) - - stateDB, err = dbProvider(&DBContext{"state", config}) - if err != nil { - return - } - - return -} - func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) { proxyApp := proxy.NewAppConns(clientCreator) proxyApp.SetLogger(logger.With("module", "proxy")) @@ -401,6 +381,7 @@ func createConsensusReactor( waitSync bool, eventBus *types.EventBus, dag ipld.DAGService, + croute routing.ContentRouting, consensusLogger log.Logger) (*cs.Reactor, *cs.State) { consensusState := cs.NewState( @@ -410,6 +391,7 @@ func createConsensusReactor( blockStore, mempool, dag, + croute, evidencePool, cs.StateMetrics(csMetrics), ) @@ -643,12 +625,7 @@ func NewNode(config *cfg.Config, logger log.Logger, options ...Option) (*Node, error) { - dag, ipfsclose, err := ipfsProvider() - if err != nil { - return nil, err - } - - blockStore, stateDB, err := initDBs(config, dbProvider, dag) + stateDB, err := dbProvider(&DBContext{"state", config}) if err != nil { return nil, err } @@ -703,6 +680,18 @@ func NewNode(config *cfg.Config, stateSync = false } + dag, ipfsNode, err := ipfsProvider() + if err != nil { + return nil, err + } + + blockStoreDB, err := dbProvider(&DBContext{"blockstore", config}) + if err != nil { + return nil, err + } + + blockStore := store.NewBlockStore(blockStoreDB, dag) + // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. consensusLogger := logger.With("module", "consensus") @@ -762,7 +751,7 @@ func NewNode(config *cfg.Config, } consensusReactor, consensusState := createConsensusReactor( config, state, blockExec, blockStore, mempool, evidencePool, - privValidator, csMetrics, stateSync || fastSync, eventBus, dag, consensusLogger, + privValidator, csMetrics, stateSync || fastSync, eventBus, dag, ipfsNode.Routing, consensusLogger, ) // Set up state sync reactor, and schedule a sync if requested. @@ -863,7 +852,7 @@ func NewNode(config *cfg.Config, txIndexer: txIndexer, indexerService: indexerService, eventBus: eventBus, - ipfsClose: ipfsclose, + ipfsClose: ipfsNode, } node.BaseService = *service.NewBaseService(logger, "Node", node) diff --git a/p2p/ipld/net_test.go b/p2p/ipld/net_test.go new file mode 100644 index 0000000000..4e20652bf2 --- /dev/null +++ b/p2p/ipld/net_test.go @@ -0,0 +1,135 @@ +package ipld + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + "github.com/ipfs/go-blockservice" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + blockstore "github.com/ipfs/go-ipfs-blockstore" + ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + "github.com/lazyledger/rsmt2d" + dht "github.com/libp2p/go-libp2p-kad-dht" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/lazyledger/lazyledger-core/ipfs/plugin" + "github.com/lazyledger/lazyledger-core/libs/log" + "github.com/lazyledger/lazyledger-core/types" + "github.com/lazyledger/lazyledger-core/types/consts" +) + +func TestDiscovery(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + dhts := dhtNet(ctx, t, 2) + dht1, dht2 := dhts[0], dhts[0] + + data := generateRandomBlockData(64, consts.MsgShareSize-2) + b := &types.Block{ + Data: data, + LastCommit: &types.Commit{}, + } + b.Hash() + + id, err := plugin.CidFromNamespacedSha256(b.DataAvailabilityHeader.RowsRoots[0].Bytes()) + require.NoError(t, err) + + err = dht1.Provide(ctx, id, false) + require.NoError(t, err) + + prvs, err := dht2.FindProviders(ctx, id) + require.NoError(t, err) + assert.Equal(t, dht1.PeerID(), prvs[0].ID, "peer not found") +} + +func TestWriteDiscoveryReadData(t *testing.T) { + logger := log.TestingLogger() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + dags, dhts := dagNet(ctx, t, 5) + blocks := make([]*types.Block, len(dags)) + for i, dag := range dags { + data := generateRandomBlockData(64, consts.MsgShareSize-2) + b := &types.Block{ + Data: data, + LastCommit: &types.Commit{}, + } + b.Hash() + blocks[i] = b + + err := PutBlock(ctx, dag, blocks[i], dhts[i], logger) + require.NoError(t, err) + } + + for i, dag := range dags { + if i == len(dags)-1 { + i = 0 + } + + exp := blocks[i+1] + actual, err := RetrieveBlockData(ctx, &exp.DataAvailabilityHeader, dag, rsmt2d.NewRSGF8Codec()) + assert.NoError(t, err) + assert.EqualValues(t, exp.Data.Txs, actual.Txs, "blocks are not equal") + } +} + +func dagNet(ctx context.Context, t *testing.T, num int) ([]ipld.DAGService, []*dht.IpfsDHT) { + net := mocknet.New(ctx) + _, medium := dagNode(ctx, t, net) + dags, dhts := make([]ipld.DAGService, num), make([]*dht.IpfsDHT, num) + for i := range dags { + dags[i], dhts[i] = dagNode(ctx, t, net) + } + bootstrap(ctx, t, net, medium, dhts...) + return dags, dhts +} + +func dhtNet(ctx context.Context, t *testing.T, num int) []*dht.IpfsDHT { + net := mocknet.New(ctx) + medium := dhtNode(ctx, t, net) + dhts := make([]*dht.IpfsDHT, num) + for i := range dhts { + dhts[i] = dhtNode(ctx, t, net) + } + bootstrap(ctx, t, net, medium, dhts...) + return dhts +} + +func dagNode(ctx context.Context, t *testing.T, net mocknet.Mocknet) (ipld.DAGService, *dht.IpfsDHT) { + dstore := dssync.MutexWrap(ds.NewMapDatastore()) + bstore := blockstore.NewBlockstore(dstore) + routing := dhtNode(ctx, t, net) + bs := bitswap.New(ctx, network.NewFromIpfsHost(routing.Host(), routing), bstore, bitswap.ProvideEnabled(false)) + return merkledag.NewDAGService(blockservice.New(bstore, bs)), routing +} + +func dhtNode(ctx context.Context, t *testing.T, net mocknet.Mocknet) *dht.IpfsDHT { + host, err := net.GenPeer() + require.NoError(t, err) + dstore := dssync.MutexWrap(ds.NewMapDatastore()) + routing, err := dht.New(ctx, host, dht.Datastore(dstore), dht.Mode(dht.ModeServer), dht.BootstrapPeers()) + require.NoError(t, err) + return routing +} + +func bootstrap(ctx context.Context, t *testing.T, net mocknet.Mocknet, bstrapper *dht.IpfsDHT, peers ...*dht.IpfsDHT) { + err := net.LinkAll() + require.NoError(t, err) + for _, p := range peers { + _, err := net.ConnectPeers(bstrapper.PeerID(), p.PeerID()) + require.NoError(t, err) + err = bstrapper.Bootstrap(ctx) + require.NoError(t, err) + } + err = bstrapper.Bootstrap(ctx) + require.NoError(t, err) +} diff --git a/p2p/ipld/read_test.go b/p2p/ipld/read_test.go index 04819f0db1..690caf321c 100644 --- a/p2p/ipld/read_test.go +++ b/p2p/ipld/read_test.go @@ -19,7 +19,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/lazyledger/lazyledger-core/ipfs" "github.com/lazyledger/lazyledger-core/ipfs/plugin" + "github.com/lazyledger/lazyledger-core/libs/log" "github.com/lazyledger/lazyledger-core/p2p/ipld/wrapper" "github.com/lazyledger/lazyledger-core/types" "github.com/lazyledger/lazyledger-core/types/consts" @@ -114,6 +116,7 @@ func TestBlockRecovery(t *testing.T) { } func TestRetrieveBlockData(t *testing.T) { + logger := log.TestingLogger() type test struct { name string squareSize int @@ -138,6 +141,7 @@ func TestRetrieveBlockData(t *testing.T) { t.Run(fmt.Sprintf("%s size %d", tc.name, tc.squareSize), func(t *testing.T) { ctx := context.Background() dag := mdutils.Mock() + croute := ipfs.MockRouting() blockData := generateRandomBlockData(tc.squareSize*tc.squareSize, consts.MsgShareSize-2) block := &types.Block{ @@ -147,7 +151,7 @@ func TestRetrieveBlockData(t *testing.T) { // if an error is exected, don't put the block if !tc.expectErr { - err := PutBlock(ctx, dag, block) + err := PutBlock(ctx, dag, block, croute, logger) require.NoError(t, err) } diff --git a/p2p/ipld/validate.go b/p2p/ipld/validate.go index 2c33694a08..94cdab066f 100644 --- a/p2p/ipld/validate.go +++ b/p2p/ipld/validate.go @@ -15,7 +15,7 @@ import ( // ValidationTimeout specifies timeout for DA validation during which data have to be found on the network, // otherwise ErrValidationFailed is thrown. // TODO: github.com/lazyledger/lazyledger-core/issues/280 -const ValidationTimeout = 1 * time.Minute +const ValidationTimeout = 10 * time.Minute // ErrValidationFailed is returned whenever DA validation fails var ErrValidationFailed = errors.New("validation failed") diff --git a/p2p/ipld/validate_test.go b/p2p/ipld/validate_test.go index 76dde32b25..c7e231e732 100644 --- a/p2p/ipld/validate_test.go +++ b/p2p/ipld/validate_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/lazyledger/lazyledger-core/ipfs" + "github.com/lazyledger/lazyledger-core/libs/log" "github.com/lazyledger/lazyledger-core/types" "github.com/lazyledger/lazyledger-core/types/consts" ) @@ -34,7 +36,7 @@ func TestValidateAvailability(t *testing.T) { block.Hash() dag := mdutils.Mock() - err := PutBlock(ctx, dag, block) + err := PutBlock(ctx, dag, block, ipfs.MockRouting(), log.TestingLogger()) require.NoError(t, err) calls := 0 diff --git a/p2p/ipld/write.go b/p2p/ipld/write.go index 16ac4ee7b7..44fa5dc11b 100644 --- a/p2p/ipld/write.go +++ b/p2p/ipld/write.go @@ -2,25 +2,35 @@ package ipld import ( "context" - "errors" "fmt" "math" + "sync/atomic" + "time" + "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" "github.com/lazyledger/nmt" "github.com/lazyledger/rsmt2d" + "github.com/libp2p/go-libp2p-core/routing" + kbucket "github.com/libp2p/go-libp2p-kbucket" + "github.com/lazyledger/lazyledger-core/ipfs/plugin" + "github.com/lazyledger/lazyledger-core/libs/log" + "github.com/lazyledger/lazyledger-core/libs/sync" "github.com/lazyledger/lazyledger-core/p2p/ipld/wrapper" "github.com/lazyledger/lazyledger-core/types" ) // PutBlock posts and pins erasured block data to IPFS using the provided // ipld.NodeAdder. Note: the erasured data is currently recomputed -func PutBlock(ctx context.Context, adder ipld.NodeAdder, block *types.Block) error { - if adder == nil { - return errors.New("no ipfs node adder provided") - } - +// TODO this craves for refactor +func PutBlock( + ctx context.Context, + adder ipld.NodeAdder, + block *types.Block, + croute routing.ContentRouting, + logger log.Logger, +) error { // recompute the shares namespacedShares, _ := block.Data.ComputeShares() shares := namespacedShares.RawShares() @@ -42,12 +52,115 @@ func PutBlock(ctx context.Context, adder ipld.NodeAdder, block *types.Block) err if err != nil { return fmt.Errorf("failure to recompute the extended data square: %w", err) } + // get row and col roots to be provided + // this also triggers adding data to DAG + prov := newProvider(ctx, croute, int32(squareSize*4), logger.With("height", block.Height)) + for _, root := range eds.RowRoots() { + prov.Provide(plugin.MustCidFromNamespacedSha256(root)) + } + for _, root := range eds.ColumnRoots() { + prov.Provide(plugin.MustCidFromNamespacedSha256(root)) + } + // commit the batch to ipfs + err = batchAdder.Commit() + if err != nil { + return err + } + // wait until we provided all the roots if requested + <-prov.Done() + return prov.Err() +} - // thanks to the batchAdder.Visit func we added to the nmt wrapper, - // generating the roots will start adding the data to IPFS - eds.RowRoots() - eds.ColumnRoots() +var provideWorkers = 32 - // commit the batch to ipfs - return batchAdder.Commit() +type provider struct { + ctx context.Context + done chan struct{} + + err error + errLk sync.RWMutex + + jobs chan cid.Cid + total int32 + + croute routing.ContentRouting + log log.Logger + startTime time.Time +} + +func newProvider(ctx context.Context, croute routing.ContentRouting, toProvide int32, logger log.Logger) *provider { + p := &provider{ + ctx: ctx, + done: make(chan struct{}), + jobs: make(chan cid.Cid, provideWorkers), + total: toProvide, + croute: croute, + log: logger, + } + for range make([]bool, provideWorkers) { + go p.worker() + } + logger.Info("Started Providing to DHT") + p.startTime = time.Now() + return p +} + +func (p *provider) Provide(id cid.Cid) { + select { + case p.jobs <- id: + case <-p.ctx.Done(): + } +} + +func (p *provider) Done() <-chan struct{} { + return p.done +} + +func (p *provider) Err() error { + p.errLk.RLock() + defer p.errLk.RUnlock() + if p.err != nil { + return p.err + } + return p.ctx.Err() +} + +func (p *provider) worker() { + for { + select { + case id := <-p.jobs: + err := p.croute.Provide(p.ctx, id, true) + // Omit ErrLookupFailure to decrease test log spamming as + // this simply indicates we haven't connected to other DHT nodes yet. + if err != nil && err != kbucket.ErrLookupFailure { + if p.Err() == nil { + p.errLk.Lock() + p.err = err + p.errLk.Unlock() + } + + p.log.Error("failed to provide to DHT", "err", err.Error()) + } + + p.provided() + case <-p.ctx.Done(): + for { + select { + case <-p.jobs: // drain chan + p.provided() // ensure done is closed + default: + return + } + } + case <-p.done: + return + } + } +} + +func (p *provider) provided() { + if atomic.AddInt32(&p.total, -1) == 0 { + p.log.Info("Finished providing to DHT", "took", time.Since(p.startTime).String()) + close(p.done) + } } diff --git a/p2p/ipld/write_test.go b/p2p/ipld/write_test.go index 5a93f62992..d5e37faf6a 100644 --- a/p2p/ipld/write_test.go +++ b/p2p/ipld/write_test.go @@ -6,30 +6,25 @@ import ( "testing" "time" - "github.com/ipfs/go-ipfs/core/coreapi" - coremock "github.com/ipfs/go-ipfs/core/mock" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/lazyledger/nmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" abci "github.com/lazyledger/lazyledger-core/abci/types" "github.com/lazyledger/lazyledger-core/crypto/tmhash" + "github.com/lazyledger/lazyledger-core/ipfs" "github.com/lazyledger/lazyledger-core/ipfs/plugin" + "github.com/lazyledger/lazyledger-core/libs/log" tmproto "github.com/lazyledger/lazyledger-core/proto/tendermint/types" "github.com/lazyledger/lazyledger-core/types" "github.com/lazyledger/lazyledger-core/types/consts" ) func TestPutBlock(t *testing.T) { - ipfsNode, err := coremock.NewMockNode() - if err != nil { - t.Error(err) - } - - ipfsAPI, err := coreapi.NewCoreAPI(ipfsNode) - if err != nil { - t.Error(err) - } + logger := log.TestingLogger() + dag := mdutils.Mock() + croute := ipfs.MockRouting() maxOriginalSquareSize := consts.MaxSquareSize / 2 maxShareCount := maxOriginalSquareSize * maxOriginalSquareSize @@ -52,7 +47,7 @@ func TestPutBlock(t *testing.T) { block := &types.Block{Data: tc.blockData} t.Run(tc.name, func(t *testing.T) { - err = PutBlock(ctx, ipfsAPI.Dag(), block) + err := PutBlock(ctx, dag, block, croute, logger) if tc.expectErr { require.Error(t, err) require.Contains(t, err.Error(), tc.errString) @@ -73,7 +68,7 @@ func TestPutBlock(t *testing.T) { } // retrieve the data from IPFS - _, err = ipfsAPI.Dag().Get(timeoutCtx, cid) + _, err = dag.Get(timeoutCtx, cid) if err != nil { t.Errorf("Root not found: %s", cid.String()) } @@ -114,15 +109,10 @@ func toMessageSlice(msgs [][]byte) []*tmproto.Message { } func TestDataAvailabilityHeaderRewriteBug(t *testing.T) { - ipfsNode, err := coremock.NewMockNode() - if err != nil { - t.Error(err) - } + logger := log.TestingLogger() + dag := mdutils.Mock() + croute := ipfs.MockRouting() - ipfsAPI, err := coreapi.NewCoreAPI(ipfsNode) - if err != nil { - t.Error(err) - } txs := types.Txs{} l := len(txs) bzs := make([][]byte, l) @@ -158,7 +148,7 @@ func TestDataAvailabilityHeaderRewriteBug(t *testing.T) { hash1 := block.DataAvailabilityHeader.Hash() ctx := context.TODO() - err = PutBlock(ctx, ipfsAPI.Dag(), block) + err = PutBlock(ctx, dag, block, croute, logger) if err != nil { t.Fatal(err) }