Skip to content

Commit

Permalink
make APIProvider Online/Offline and use offline DAG for blockstore
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-forbes committed Jun 3, 2021
1 parent 4908fef commit 8619f27
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 73 deletions.
7 changes: 5 additions & 2 deletions cmd/tendermint/commands/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ func runProxy(cmd *cobra.Command, args []string) error {
cfg := ipfs.DefaultConfig()
cfg.RootDir = dir
// TODO(ismail): share badger instance
apiProvider := ipfs.Embedded(true, cfg, logger)
apiProvider, err := ipfs.Embedded(true, cfg, logger)
if err != nil {
return err
}
var dag coreiface.APIDagService
dag, ipfsCloser, err = apiProvider()
dag, ipfsCloser, err = apiProvider(true)
if err != nil {
return fmt.Errorf("could not start ipfs API: %w", err)
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/tendermint/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ func NewRunNodeCmd(nodeProvider nm.Provider) *cobra.Command {
return err
}

n, err := nodeProvider(
config,
ipfs.Embedded(initIPFS, config.IPFS, logger),
logger,
)
provider, err := ipfs.Embedded(initIPFS, config.IPFS, logger)
if err != nil {
return err
}

n, err := nodeProvider(config, provider, logger)
if err != nil {
return fmt.Errorf("failed to create node: %w", err)
}
Expand Down
120 changes: 61 additions & 59 deletions ipfs/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"sync"

Expand All @@ -16,7 +15,6 @@ import (
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/fsrepo"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"

Expand All @@ -25,70 +23,74 @@ import (

// Embedded is the provider that embeds IPFS node within the same process.
// It also returns closable for graceful node shutdown.
func Embedded(init bool, cfg *Config, logger log.Logger) APIProvider {
return func() (coreiface.APIDagService, io.Closer, error) {
path := cfg.Path()
defer os.Setenv(ipfscfg.EnvDir, path)
func Embedded(init bool, cfg *Config, logger log.Logger) (APIProvider, error) {
path := cfg.Path()
defer os.Setenv(ipfscfg.EnvDir, path)

// NOTE: no need to validate the path before
if err := plugins(path); err != nil {
return nil, nil, err
}
// Init Repo if requested
if init {
if err := InitRepo(path, logger); err != nil {
return nil, nil, err
}
}
// Open the repo
repo, err := fsrepo.Open(path)
if err != nil {
var nrerr fsrepo.NoRepoError
if errors.As(err, &nrerr) {
return nil, nil, fmt.Errorf("no IPFS repo found in %s.\nplease use flag: --ipfs-init", nrerr.Path)
}
return nil, nil, err
}
// Construct the node
nodeOptions := &core.BuildCfg{
Online: true,
// This option sets the node to be a full DHT node (both fetching and storing DHT Records)
Routing: libp2p.DHTOption,
// This option sets the node to be a client DHT node (only fetching records)
// Routing: libp2p.DHTClientOption,
Repo: repo,
}
// Internally, ipfs decorates the context with a
// context.WithCancel. Which is then used for lifecycle management.
// We do not make use of this context and rely on calling
// Close() on the node instead
ctx := context.Background()
// It is essential that we create a fresh instance of ipfs node on
// each start as internally the node gets only stopped once per instance.
// At least in ipfs 0.7.0; see:
// https://github.com/lazyledger/go-ipfs/blob/dd295e45608560d2ada7d7c8a30f1eef3f4019bb/core/builder.go#L48-L57
node, err := core.NewNode(ctx, nodeOptions)
if err != nil {
_ = repo.Close()
return nil, nil, err
// NOTE: no need to validate the path before
if err := plugins(path); err != nil {
return nil, err
}
// Init Repo if requested
if init {
if err := InitRepo(path, logger); err != nil {
return nil, err
}
// Serve API if requested
if cfg.ServeAPI {
if err := serveAPI(path, repo, node); err != nil {
_ = node.Close()
return nil, nil, err
}
}
// Open the repo
repo, err := fsrepo.Open(path)
if err != nil {
var nrerr fsrepo.NoRepoError
if errors.As(err, &nrerr) {
return nil, fmt.Errorf("no IPFS repo found in %s.\nplease use flag: --ipfs-init", nrerr.Path)
}
// Wrap Node and create CoreAPI
api, err := coreapi.NewCoreAPI(node)
if err != nil {
return nil, err
}
// Construct the node
nodeOptions := &core.BuildCfg{
Online: true,
// This option sets the node to be a full DHT node (both fetching and storing DHT Records)
Routing: libp2p.DHTOption,
// This option sets the node to be a client DHT node (only fetching records)
// Routing: libp2p.DHTClientOption,
Repo: repo,
}
// Internally, ipfs decorates the context with a
// context.WithCancel. Which is then used for lifecycle management.
// We do not make use of this context and rely on calling
// Close() on the node instead
ctx := context.Background()
// It is essential that we create a fresh instance of ipfs node on
// each start as internally the node gets only stopped once per instance.
// At least in ipfs 0.7.0; see:
// https://github.com/lazyledger/go-ipfs/blob/dd295e45608560d2ada7d7c8a30f1eef3f4019bb/core/builder.go#L48-L57
node, err := core.NewNode(ctx, nodeOptions)
if err != nil {
_ = repo.Close()
return nil, err
}
// Serve API if requested
if cfg.ServeAPI {
if err := serveAPI(path, repo, node); err != nil {
_ = node.Close()
return nil, nil, fmt.Errorf("failed to create an instance of the IPFS core API: %w", err)
return nil, err
}
}
// Wrap Node and create CoreAPI
api, err := coreapi.NewCoreAPI(node)
if err != nil {
_ = node.Close()
return nil, fmt.Errorf("failed to create an instance of the IPFS core API: %w", err)
}

logger.Info("Successfully created embedded IPFS node", "ipfs-repo", path)
return api.Dag(), node, nil
provider, err := NewOnlineOfflineProvider(api, node)
if err != nil {
_ = repo.Close()
return nil, fmt.Errorf("failed to create an online offline provider: %w", err)
}

logger.Info("Successfully created embedded IPFS node", "ipfs-repo", path)
return provider, nil
}

// serveAPI creates and HTTP server for IPFS API.
Expand Down
2 changes: 1 addition & 1 deletion ipfs/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// Mock provides simple mock IPFS API useful for testing
func Mock() APIProvider {
return func() (coreiface.APIDagService, io.Closer, error) {
return func(bool) (coreiface.APIDagService, io.Closer, error) {
dom := dagOnlyMock{mdutils.Mock()}

return dom, dom, nil
Expand Down
39 changes: 38 additions & 1 deletion ipfs/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,44 @@ import (
"io"

coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
)

// APIProvider allows customizable IPFS core APIs.
type APIProvider func() (coreiface.APIDagService, io.Closer, error)
type APIProvider func(online bool) (coreiface.APIDagService, io.Closer, error)

type onlineOfflineProvider struct {
online, offline coreiface.APIDagService
closer io.Closer
}

func NewOnlineOfflineProvider(online coreiface.CoreAPI, closer io.Closer) (APIProvider, error) {

offline, err := online.WithOptions(OfflineOption)
if err != nil {
return nil, err
}

provider := onlineOfflineProvider{
online: online.Dag(),
offline: offline.Dag(),
}

return provider.Provide, nil
}

func (o onlineOfflineProvider) Provide(online bool) (coreiface.APIDagService, io.Closer, error) {
switch online {
case true:
return o.online, o.closer, nil
case false:
return o.offline, o.closer, nil
}
// the compiler does't know this code is unreachable
return nil, nil, nil
}

func OfflineOption(settings *options.ApiSettings) error {
settings.Offline = true
return nil
}
13 changes: 10 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,12 +643,12 @@ func NewNode(config *cfg.Config,
logger log.Logger,
options ...Option) (*Node, error) {

dag, ipfsclose, err := ipfsProvider()
offlineDAG, ipfsclose, err := ipfsProvider(false)
if err != nil {
return nil, err
}

blockStore, stateDB, err := initDBs(config, dbProvider, dag)
blockStore, stateDB, err := initDBs(config, dbProvider, offlineDAG)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -760,9 +760,16 @@ func NewNode(config *cfg.Config,
} else if fastSync {
csMetrics.FastSyncing.Set(1)
}

// the ipfscloser is already declared, so we don't do anything with it here
onlineDAG, _, err := ipfsProvider(true)
if err != nil {
return nil, err
}

consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || fastSync, eventBus, dag, consensusLogger,
privValidator, csMetrics, stateSync || fastSync, eventBus, onlineDAG, consensusLogger,
)

// Set up state sync reactor, and schedule a sync if requested.
Expand Down
11 changes: 9 additions & 2 deletions test/e2e/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,20 @@ func startNode(cfg *Config) error {
if err != nil {
return err
}
n, err := node.NewNode(tmcfg,

provider, err := ipfs.Embedded(true, ipfs.DefaultConfig(), nodeLogger)
if err != nil {
return err
}

n, err := node.NewNode(
tmcfg,
pval,
*nodeKey,
proxy.NewLocalClientCreator(app),
node.DefaultGenesisDocProviderFunc(tmcfg),
node.DefaultDBProvider,
ipfs.Embedded(true, ipfs.DefaultConfig(), nodeLogger),
provider,
node.DefaultMetricsProvider(tmcfg.Instrumentation),
nodeLogger,
)
Expand Down

0 comments on commit 8619f27

Please sign in to comment.