Skip to content

Commit

Permalink
run Ipfs node OnStart
Browse files Browse the repository at this point in the history
  • Loading branch information
liamsi committed Jan 23, 2021
1 parent 60537ed commit 8ce5bb2
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 30 deletions.
37 changes: 24 additions & 13 deletions cmd/tendermint/commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"fmt"
"os"
"path/filepath"

ipfscfg "github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs/plugin/loader"
Expand Down Expand Up @@ -104,14 +105,14 @@ func initFilesWithConfig(config *cfg.Config) error {
logger.Info("Generated genesis file", "path", genFile)
}

if err := InitIpfs(config); err != nil {
if err := initIpfs(config); err != nil {
return err
}

return nil
}

func InitIpfs(config *cfg.Config) error { // add counter part in ResetAllCmd
func initIpfs(config *cfg.Config) error { // add counter part in ResetAllCmd
// init IPFS config with params from config.IPFS
// and store in config.IPFS.ConfigRootPath
repoRoot := config.IPFSRepoRoot()
Expand All @@ -125,7 +126,7 @@ func InitIpfs(config *cfg.Config) error { // add counter part in ResetAllCmd
return err
}

logger.Info("initializing IPFS node at:", repoRoot)
logger.Info("initializing IPFS node", "ipfs-path", repoRoot)

if err := tmos.EnsureDir(repoRoot, 0700); err != nil {
return err
Expand All @@ -137,27 +138,37 @@ func InitIpfs(config *cfg.Config) error { // add counter part in ResetAllCmd
}

applyFromTmConfig(conf, config.IPFS)
plugins, err := loader.NewPluginLoader(repoRoot)
if err != nil {
return err
}
if err := plugins.Initialize(); err != nil {
return err
}

if err := plugins.Inject(); err != nil {
if err := setupPlugins(repoRoot); err != nil {
return err
}

if err := fsrepo.Init(repoRoot, conf); err != nil {
return err
}
} else {
logger.Info("IPFS was already initialized in %v", config.IPFS.ConfigRootPath)
logger.Info("IPFS was already initialized", "ipfs-path", repoRoot)
}
return nil
}

func setupPlugins(path string) error {
// Load plugins. This will skip the repo if not available.
plugins, err := loader.NewPluginLoader(filepath.Join(path, "plugins"))
if err != nil {
return fmt.Errorf("error loading plugins: %s", err)
}

if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}

if err := plugins.Inject(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}

return nil
}

func applyFromTmConfig(ipfsConf *ipfscfg.Config, tmConf *cfg.IPFSConfig) {
ipfsConf.Addresses.API = ipfscfg.Strings{tmConf.API}
ipfsConf.Addresses.Gateway = ipfscfg.Strings{tmConf.Gateway}
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func TestConfig() *Config {
Consensus: TestConsensusConfig(),
TxIndex: TestTxIndexConfig(),
Instrumentation: TestInstrumentationConfig(),
IPFS: TetsIpfsConfig(),
}
}

Expand Down Expand Up @@ -1029,6 +1030,10 @@ func DefaultInstrumentationConfig() *InstrumentationConfig {
}
}

func TetsIpfsConfig() *IPFSConfig {
return DefaultIPFSConfig()
}

// TestInstrumentationConfig returns a default configuration for metrics
// reporting.
func TestInstrumentationConfig() *InstrumentationConfig {
Expand Down
64 changes: 57 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"net"
"net/http"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"path/filepath"
"strings"
"time"

ipfscore "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/lazyledger/lazyledger-core/p2p/ipld/plugin/nodes"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
Expand Down Expand Up @@ -107,6 +110,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
logger,
EmbedIpfsNode(true),
)
}

Expand Down Expand Up @@ -170,6 +174,22 @@ func StateProvider(stateProvider statesync.StateProvider) Option {
}
}

// IpfsPluginsWereLoaded indicates that all IPFS plugin were already loaded.
// Setting up plugins will skipped when creating the IPFS node.
func IpfsPluginsWereLoaded(wereAlreadyLoaded bool) Option {
return func(n *Node) {
n.areIfsPluginsAlreadyLoaded = wereAlreadyLoaded
}
}

// IpfsPluginsWereLoaded indicates that all IPFS plugin were already loaded.
// Setting up plugins will skipped when creating the IPFS node.
func EmbedIpfsNode(embed bool) Option {
return func(n *Node) {
n.embedIpfsNode = embed
}
}

//------------------------------------------------------------------------------

// Node is the highest level interface to a full Tendermint node.
Expand Down Expand Up @@ -211,7 +231,9 @@ type Node struct {
indexerService *txindex.IndexerService
prometheusSrv *http.Server
// we store a ref to the full IpfsNode (instead of ipfs' CoreAPI) so we can Close() it OnStop()
ipfsNode *ipfscore.IpfsNode // ipfs node
embedIpfsNode bool // whether the node should start an IPFS node on startup
ipfsNode *ipfscore.IpfsNode // ipfs node
areIfsPluginsAlreadyLoaded bool // avoid injecting plugins twice in tests etc
}

func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
Expand Down Expand Up @@ -929,10 +951,11 @@ func (n *Node) OnStart() error {
return fmt.Errorf("failed to start state sync: %w", err)
}
}

n.ipfsNode, err = createIpfsNode(n.config)
if err != nil {
return fmt.Errorf("failed to create IPFS node: %w", err)
if n.embedIpfsNode {
n.ipfsNode, err = createIpfsNode(n.config, n.areIfsPluginsAlreadyLoaded, n.Logger)
if err != nil {
return fmt.Errorf("failed to create IPFS node: %w", err)
}
}

return nil
Expand Down Expand Up @@ -1412,12 +1435,18 @@ func createAndStartPrivValidatorSocketClient(
return pvscWithRetries, nil
}

func createIpfsNode(config *cfg.Config) (*ipfscore.IpfsNode, error) {
func createIpfsNode(config *cfg.Config, arePluginsAlreadyLoaded bool, logger log.Logger) (*ipfscore.IpfsNode, error) {
repoRoot := config.IPFSRepoRoot()
logger.Info("creating node in repo", "ipfs-root", repoRoot)
if !fsrepo.IsInitialized(repoRoot) {
// TODO: sentinel err
return nil, fmt.Errorf("ipfs repo root: %v not intitialized", repoRoot)
}
if !arePluginsAlreadyLoaded {
if err := setupPlugins(repoRoot, logger); err != nil {
return nil, err
}
}
// Open the repo
repo, err := fsrepo.Open(repoRoot)
if err != nil {
Expand All @@ -1441,10 +1470,31 @@ func createIpfsNode(config *cfg.Config) (*ipfscore.IpfsNode, error) {
}
// run as daemon:
node.IsDaemon = true

return node, nil
}

func setupPlugins(path string, logger log.Logger) error {
// Load plugins. This will skip the repo if not available.
plugins, err := loader.NewPluginLoader(filepath.Join(path, "plugins"))
if err != nil {
return fmt.Errorf("error loading plugins: %s", err)
}
plugins.Load(&nodes.LazyLedgerPlugin{})
if err != nil {
return fmt.Errorf("error loading lazyledger plugin: %s", err)
}

if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: plugins.Initialize(): %s", err)
}

if err := plugins.Inject(); err != nil {
logger.Error("error initializing plugins: could not Inject()", "err", err)
}

return nil
}

// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
// slice of the string s with all leading and trailing Unicode code points
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each
Expand Down
2 changes: 2 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestNodeStartStop(t *testing.T) {

// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
n.embedIpfsNode = false // TODO: or init ipfs upfront
require.NoError(t, err)
err = n.Start()
require.NoError(t, err)
Expand Down Expand Up @@ -103,6 +104,7 @@ func TestNodeDelayedStart(t *testing.T) {

// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
n.embedIpfsNode = false // TODO: or init ipfs upfront
n.GenesisDoc().GenesisTime = now.Add(2 * time.Second)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions rpc/client/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func ExampleHTTP_simple() {
// Start a tendermint node (and kvstore) in the background to test against
app := kvstore.NewApplication()
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig, rpctest.DoNotLoadIpfsPlugins)
defer rpctest.StopTendermint(node)

// Create our RPC client
Expand Down Expand Up @@ -68,7 +68,7 @@ func ExampleHTTP_simple() {
func ExampleHTTP_batching() {
// Start a tendermint node (and kvstore) in the background to test against
app := kvstore.NewApplication()
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig, rpctest.DoNotLoadIpfsPlugins)

// Create our RPC client
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
Expand Down
78 changes: 70 additions & 8 deletions rpc/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ package rpctest
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"

ipfscfg "github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/ipfs/interface-go-ipfs-core/options"
abci "github.com/lazyledger/lazyledger-core/abci/types"
cmd "github.com/lazyledger/lazyledger-core/cmd/tendermint/commands"
"github.com/lazyledger/lazyledger-core/libs/log"
tmos "github.com/lazyledger/lazyledger-core/libs/os"
"github.com/lazyledger/lazyledger-core/p2p/ipld/plugin/nodes"

cfg "github.com/lazyledger/lazyledger-core/config"
tmnet "github.com/lazyledger/lazyledger-core/libs/net"
Expand All @@ -26,14 +32,16 @@ import (
// Options helps with specifying some parameters for our RPC testing for greater
// control.
type Options struct {
suppressStdout bool
recreateConfig bool
suppressStdout bool
recreateConfig bool
loadIpfsPlugins bool
}

var globalConfig *cfg.Config
var defaultOptions = Options{
suppressStdout: false,
recreateConfig: false,
suppressStdout: false,
recreateConfig: false,
loadIpfsPlugins: true,
}

func waitForRPC() {
Expand Down Expand Up @@ -151,7 +159,6 @@ func StopTendermint(node *nm.Node) {

// NewTendermint creates a new tendermint server and sleeps forever
func NewTendermint(app abci.Application, opts *Options) *nm.Node {
// TODO init ipfs
// Create & start node
config := GetConfig(opts.recreateConfig)
var logger log.Logger
Expand All @@ -174,21 +181,71 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node {
}

config.IPFS = cfg.DefaultIPFSConfig()
err = cmd.InitIpfs(config)
err = initIpfs(config, opts.loadIpfsPlugins, logger)
if err != nil {
panic(err)
}
node, err := nm.NewNode(config, pv, nodeKey, papp,
nm.DefaultGenesisDocProviderFunc(config),
nm.DefaultDBProvider,
nm.DefaultMetricsProvider(config.Instrumentation),
logger)
logger,
nm.IpfsPluginsWereLoaded(true),
)
if err != nil {
panic(err)
}
return node
}

func initIpfs(config *cfg.Config, loadPlugins bool, log log.Logger) error { // add counter part in ResetAllCmd
// init IPFS config with params from config.IPFS
// and store in config.IPFS.ConfigRootPath
repoRoot := config.IPFSRepoRoot()
if !fsrepo.IsInitialized(repoRoot) {
var conf *ipfscfg.Config

identity, err := ipfscfg.CreateIdentity(ioutil.Discard, []options.KeyGenerateOption{
options.Key.Type(options.Ed25519Key),
})
if err != nil {
return err
}

if err := tmos.EnsureDir(repoRoot, 0700); err != nil {
return err
}
if loadPlugins {
plugins, err := loader.NewPluginLoader(filepath.Join(repoRoot, "plugins"))
if err != nil {
return fmt.Errorf("error loading plugins: %s", err)
}
if err := plugins.Load(&nodes.LazyLedgerPlugin{}); err != nil {
return err
}

if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}

if err := plugins.Inject(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}
}
conf, err = ipfscfg.InitWithIdentity(identity)
if err != nil {
return fmt.Errorf("InitWithIdentity(): %w", err)
}

if err := fsrepo.Init(repoRoot, conf); err != nil {
return err
}
} else {
log.Info("ipfs repo already initialized", "repo-root", repoRoot)
}
return nil
}

// SuppressStdout is an option that tries to make sure the RPC test Tendermint
// node doesn't log anything to stdout.
func SuppressStdout(o *Options) {
Expand All @@ -200,3 +257,8 @@ func SuppressStdout(o *Options) {
func RecreateConfig(o *Options) {
o.recreateConfig = true
}

// DoNotLoadIpfsPlugins
func DoNotLoadIpfsPlugins(o *Options) {
o.loadIpfsPlugins = false
}

0 comments on commit 8ce5bb2

Please sign in to comment.