From 25a8ce9ad26cf2eee6e5bacf51af44130c7d3ece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 30 Mar 2023 19:11:25 +0200 Subject: [PATCH 1/3] feat: cmd/ipfs: Allow passing custom BuildEnv to main --- cmd/ipfs/main.go | 106 ++++++++++++++++++++++----------------- cmd/ipfs/runmain_test.go | 2 +- 2 files changed, 60 insertions(+), 48 deletions(-) diff --git a/cmd/ipfs/main.go b/cmd/ipfs/main.go index 2d9673ecece..7beb4bed034 100644 --- a/cmd/ipfs/main.go +++ b/cmd/ipfs/main.go @@ -51,12 +51,20 @@ const ( heapProfile = "ipfs.memprof" ) -func loadPlugins(repoPath string) (*loader.PluginLoader, error) { +type PluginPreloader func(*loader.PluginLoader) error + +func LoadPlugins(repoPath string, preload PluginPreloader) (*loader.PluginLoader, error) { plugins, err := loader.NewPluginLoader(repoPath) if err != nil { return nil, fmt.Errorf("error loading plugins: %s", err) } + if preload != nil { + if err := preload(plugins); err != nil { + return nil, fmt.Errorf("error loading plugins (preload): %s", err) + } + } + if err := plugins.Initialize(); err != nil { return nil, fmt.Errorf("error initializing plugins: %s", err) } @@ -74,7 +82,7 @@ func loadPlugins(repoPath string) (*loader.PluginLoader, error) { // - output the response // - if anything fails, print error, maybe with help func main() { - os.Exit(mainRet()) + os.Exit(Start(BuildDefaultEnv)) } func printErr(err error) int { @@ -92,7 +100,54 @@ func newUUID(key string) logging.Metadata { } } -func mainRet() (exitCode int) { +func BuildDefaultEnv(ctx context.Context, req *cmds.Request) (cmds.Environment, error) { + return BuildEnv(ctx, req, nil) +} + +func BuildEnv(ctx context.Context, req *cmds.Request, pl PluginPreloader) (cmds.Environment, error) { + checkDebug(req) + repoPath, err := GetRepoPath(req) + if err != nil { + return nil, err + } + log.Debugf("config path is %s", repoPath) + + plugins, err := LoadPlugins(repoPath, pl) + if err != nil { + return nil, err + } + + // this sets up the function that will initialize the node + // this is so that we can construct the node lazily. + return &oldcmds.Context{ + ConfigRoot: repoPath, + ReqLog: &oldcmds.ReqLog{}, + Plugins: plugins, + ConstructNode: func() (n *core.IpfsNode, err error) { + if req == nil { + return nil, errors.New("constructing node without a request") + } + + r, err := fsrepo.Open(repoPath) + if err != nil { // repo is owned by the node + return nil, err + } + + // ok everything is good. set it on the invocation (for ownership) + // and return it. + n, err = core.NewNode(ctx, &core.BuildCfg{ + Repo: r, + }) + if err != nil { + return nil, err + } + + return n, nil + }, + }, nil +} + +func Start(buildEnv func(ctx context.Context, req *cmds.Request) (cmds.Environment, error)) (exitCode int) { rand.Seed(time.Now().UnixNano()) ctx := logging.ContextWithLoggable(context.Background(), newUUID("session")) @@ -146,49 +201,6 @@ func mainRet() (exitCode int) { // so we need to make sure it's stable os.Args[0] = "ipfs" - buildEnv := func(ctx context.Context, req *cmds.Request) (cmds.Environment, error) { - checkDebug(req) - repoPath, err := getRepoPath(req) - if err != nil { - return nil, err - } - log.Debugf("config path is %s", repoPath) - - plugins, err := loadPlugins(repoPath) - if err != nil { - return nil, err - } - - // this sets up the function that will initialize the node - // this is so that we can construct the node lazily. - return &oldcmds.Context{ - ConfigRoot: repoPath, - ReqLog: &oldcmds.ReqLog{}, - Plugins: plugins, - ConstructNode: func() (n *core.IpfsNode, err error) { - if req == nil { - return nil, errors.New("constructing node without a request") - } - - r, err := fsrepo.Open(repoPath) - if err != nil { // repo is owned by the node - return nil, err - } - - // ok everything is good. set it on the invocation (for ownership) - // and return it. - n, err = core.NewNode(ctx, &core.BuildCfg{ - Repo: r, - }) - if err != nil { - return nil, err - } - - return n, nil - }, - }, nil - } - err = cli.Run(ctx, Root, os.Args, os.Stdin, os.Stdout, os.Stderr, buildEnv, makeExecutor) if err != nil { return 1 @@ -338,7 +350,7 @@ func (twe tracingWrappedExecutor) Execute(req *cmds.Request, re cmds.ResponseEmi return err } -func getRepoPath(req *cmds.Request) (string, error) { +func GetRepoPath(req *cmds.Request) (string, error) { repoOpt, found := req.Options[corecmds.RepoDirOption].(string) if found && repoOpt != "" { return repoOpt, nil diff --git a/cmd/ipfs/runmain_test.go b/cmd/ipfs/runmain_test.go index d9187911207..dda0113602c 100644 --- a/cmd/ipfs/runmain_test.go +++ b/cmd/ipfs/runmain_test.go @@ -16,7 +16,7 @@ import ( func TestRunMain(t *testing.T) { args := flag.Args() os.Args = append([]string{os.Args[0]}, args...) - ret := mainRet() + ret := Start() p := os.Getenv("IPFS_COVER_RET_FILE") if len(p) != 0 { From 6cb7ad540e5832a6d4a18d495246dd124ec9aaa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 30 Mar 2023 19:25:56 +0200 Subject: [PATCH 2/3] feat: cmd/ipfs: Make it possible to depend on cmd/ipfs --- cmd/ipfs/{ => kubo}/add_migrations.go | 2 +- cmd/ipfs/{ => kubo}/daemon.go | 2 +- cmd/ipfs/{ => kubo}/daemon_linux.go | 2 +- cmd/ipfs/{ => kubo}/daemon_other.go | 2 +- cmd/ipfs/{ => kubo}/debug.go | 2 +- cmd/ipfs/{ => kubo}/dnsresolve_test.go | 2 +- cmd/ipfs/{ => kubo}/init.go | 2 +- cmd/ipfs/{ => kubo}/ipfs.go | 2 +- cmd/ipfs/{ => kubo}/pinmfs.go | 2 +- cmd/ipfs/{ => kubo}/pinmfs_test.go | 2 +- cmd/ipfs/kubo/start.go | 421 +++++++++++++++++++++++++ cmd/ipfs/main.go | 418 +----------------------- cmd/ipfs/runmain_test.go | 2 +- 13 files changed, 436 insertions(+), 425 deletions(-) rename cmd/ipfs/{ => kubo}/add_migrations.go (99%) rename cmd/ipfs/{ => kubo}/daemon.go (99%) rename cmd/ipfs/{ => kubo}/daemon_linux.go (95%) rename cmd/ipfs/{ => kubo}/daemon_other.go (86%) rename cmd/ipfs/{ => kubo}/debug.go (94%) rename cmd/ipfs/{ => kubo}/dnsresolve_test.go (99%) rename cmd/ipfs/{ => kubo}/init.go (99%) rename cmd/ipfs/{ => kubo}/ipfs.go (98%) rename cmd/ipfs/{ => kubo}/pinmfs.go (99%) rename cmd/ipfs/{ => kubo}/pinmfs_test.go (99%) create mode 100644 cmd/ipfs/kubo/start.go diff --git a/cmd/ipfs/add_migrations.go b/cmd/ipfs/kubo/add_migrations.go similarity index 99% rename from cmd/ipfs/add_migrations.go rename to cmd/ipfs/kubo/add_migrations.go index e0a5d710181..95cd801fc3d 100644 --- a/cmd/ipfs/add_migrations.go +++ b/cmd/ipfs/kubo/add_migrations.go @@ -1,4 +1,4 @@ -package main +package kubo import ( "context" diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/kubo/daemon.go similarity index 99% rename from cmd/ipfs/daemon.go rename to cmd/ipfs/kubo/daemon.go index ffcb30a34e9..f54eb3375cd 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/kubo/daemon.go @@ -1,4 +1,4 @@ -package main +package kubo import ( "errors" diff --git a/cmd/ipfs/daemon_linux.go b/cmd/ipfs/kubo/daemon_linux.go similarity index 95% rename from cmd/ipfs/daemon_linux.go rename to cmd/ipfs/kubo/daemon_linux.go index d06baf286b0..b612738a275 100644 --- a/cmd/ipfs/daemon_linux.go +++ b/cmd/ipfs/kubo/daemon_linux.go @@ -1,7 +1,7 @@ //go:build linux // +build linux -package main +package kubo import ( daemon "github.com/coreos/go-systemd/v22/daemon" diff --git a/cmd/ipfs/daemon_other.go b/cmd/ipfs/kubo/daemon_other.go similarity index 86% rename from cmd/ipfs/daemon_other.go rename to cmd/ipfs/kubo/daemon_other.go index cb96ce1b90c..c5b24053d94 100644 --- a/cmd/ipfs/daemon_other.go +++ b/cmd/ipfs/kubo/daemon_other.go @@ -1,7 +1,7 @@ //go:build !linux // +build !linux -package main +package kubo func notifyReady() {} diff --git a/cmd/ipfs/debug.go b/cmd/ipfs/kubo/debug.go similarity index 94% rename from cmd/ipfs/debug.go rename to cmd/ipfs/kubo/debug.go index f1b2683d188..ce07ca8e922 100644 --- a/cmd/ipfs/debug.go +++ b/cmd/ipfs/kubo/debug.go @@ -1,4 +1,4 @@ -package main +package kubo import ( "net/http" diff --git a/cmd/ipfs/dnsresolve_test.go b/cmd/ipfs/kubo/dnsresolve_test.go similarity index 99% rename from cmd/ipfs/dnsresolve_test.go rename to cmd/ipfs/kubo/dnsresolve_test.go index fcba5d69702..89ba80ef088 100644 --- a/cmd/ipfs/dnsresolve_test.go +++ b/cmd/ipfs/kubo/dnsresolve_test.go @@ -1,4 +1,4 @@ -package main +package kubo import ( "context" diff --git a/cmd/ipfs/init.go b/cmd/ipfs/kubo/init.go similarity index 99% rename from cmd/ipfs/init.go rename to cmd/ipfs/kubo/init.go index 3856f2ff4e6..12772b1be36 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/kubo/init.go @@ -1,4 +1,4 @@ -package main +package kubo import ( "context" diff --git a/cmd/ipfs/ipfs.go b/cmd/ipfs/kubo/ipfs.go similarity index 98% rename from cmd/ipfs/ipfs.go rename to cmd/ipfs/kubo/ipfs.go index 24aea66c786..b5a18da75c8 100644 --- a/cmd/ipfs/ipfs.go +++ b/cmd/ipfs/kubo/ipfs.go @@ -1,4 +1,4 @@ -package main +package kubo import ( commands "github.com/ipfs/kubo/core/commands" diff --git a/cmd/ipfs/pinmfs.go b/cmd/ipfs/kubo/pinmfs.go similarity index 99% rename from cmd/ipfs/pinmfs.go rename to cmd/ipfs/kubo/pinmfs.go index c2c0cb8b7f7..227ca8f26c4 100644 --- a/cmd/ipfs/pinmfs.go +++ b/cmd/ipfs/kubo/pinmfs.go @@ -1,4 +1,4 @@ -package main +package kubo import ( "context" diff --git a/cmd/ipfs/pinmfs_test.go b/cmd/ipfs/kubo/pinmfs_test.go similarity index 99% rename from cmd/ipfs/pinmfs_test.go rename to cmd/ipfs/kubo/pinmfs_test.go index 7f1ac869672..da71d362cdb 100644 --- a/cmd/ipfs/pinmfs_test.go +++ b/cmd/ipfs/kubo/pinmfs_test.go @@ -1,4 +1,4 @@ -package main +package kubo import ( "context" diff --git a/cmd/ipfs/kubo/start.go b/cmd/ipfs/kubo/start.go new file mode 100644 index 00000000000..d14d5c98b2d --- /dev/null +++ b/cmd/ipfs/kubo/start.go @@ -0,0 +1,421 @@ +// cmd/ipfs implements the primary CLI binary for ipfs +package kubo + +import ( + "context" + "errors" + "fmt" + "math/rand" + "net" + "net/http" + "os" + "runtime/pprof" + "strings" + "time" + + "github.com/google/uuid" + u "github.com/ipfs/boxo/util" + cmds "github.com/ipfs/go-ipfs-cmds" + "github.com/ipfs/go-ipfs-cmds/cli" + cmdhttp "github.com/ipfs/go-ipfs-cmds/http" + logging "github.com/ipfs/go-log" + "github.com/ipfs/kubo/cmd/ipfs/util" + oldcmds "github.com/ipfs/kubo/commands" + "github.com/ipfs/kubo/core" + corecmds "github.com/ipfs/kubo/core/commands" + "github.com/ipfs/kubo/core/corehttp" + "github.com/ipfs/kubo/plugin/loader" + "github.com/ipfs/kubo/repo" + "github.com/ipfs/kubo/repo/fsrepo" + "github.com/ipfs/kubo/tracing" + ma "github.com/multiformats/go-multiaddr" + madns "github.com/multiformats/go-multiaddr-dns" + manet "github.com/multiformats/go-multiaddr/net" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// log is the command logger +var log = logging.Logger("cmd/ipfs") +var tracer trace.Tracer + +// declared as a var for testing purposes +var dnsResolver = madns.DefaultResolver + +const ( + EnvEnableProfiling = "IPFS_PROF" + cpuProfile = "ipfs.cpuprof" + heapProfile = "ipfs.memprof" +) + +type PluginPreloader func(*loader.PluginLoader) error + +func LoadPlugins(repoPath string, preload PluginPreloader) (*loader.PluginLoader, error) { + plugins, err := loader.NewPluginLoader(repoPath) + if err != nil { + return nil, fmt.Errorf("error loading plugins: %s", err) + } + + if preload != nil { + if err := preload(plugins); err != nil { + return nil, fmt.Errorf("error loading plugins (preload): %s", err) + } + } + + if err := plugins.Initialize(); err != nil { + return nil, fmt.Errorf("error initializing plugins: %s", err) + } + + if err := plugins.Inject(); err != nil { + return nil, fmt.Errorf("error initializing plugins: %s", err) + } + return plugins, nil +} + +func printErr(err error) int { + fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error()) + return 1 +} + +func newUUID(key string) logging.Metadata { + ids := "#UUID-ERROR#" + if id, err := uuid.NewRandom(); err == nil { + ids = id.String() + } + return logging.Metadata{ + key: ids, + } +} + +func BuildDefaultEnv(ctx context.Context, req *cmds.Request) (cmds.Environment, error) { + return BuildEnv(ctx, req, nil) +} + +func BuildEnv(ctx context.Context, req *cmds.Request, pl PluginPreloader) (cmds.Environment, error) { + checkDebug(req) + repoPath, err := GetRepoPath(req) + if err != nil { + return nil, err + } + log.Debugf("config path is %s", repoPath) + + plugins, err := LoadPlugins(repoPath, pl) + if err != nil { + return nil, err + } + + // this sets up the function that will initialize the node + // this is so that we can construct the node lazily. + return &oldcmds.Context{ + ConfigRoot: repoPath, + ReqLog: &oldcmds.ReqLog{}, + Plugins: plugins, + ConstructNode: func() (n *core.IpfsNode, err error) { + if req == nil { + return nil, errors.New("constructing node without a request") + } + + r, err := fsrepo.Open(repoPath) + if err != nil { // repo is owned by the node + return nil, err + } + + // ok everything is good. set it on the invocation (for ownership) + // and return it. + n, err = core.NewNode(ctx, &core.BuildCfg{ + Repo: r, + }) + if err != nil { + return nil, err + } + + return n, nil + }, + }, nil +} + +func Start(buildEnv func(ctx context.Context, req *cmds.Request) (cmds.Environment, error)) (exitCode int) { + rand.Seed(time.Now().UnixNano()) + ctx := logging.ContextWithLoggable(context.Background(), newUUID("session")) + + tp, err := tracing.NewTracerProvider(ctx) + if err != nil { + return printErr(err) + } + defer func() { + if err := tp.Shutdown(ctx); err != nil { + exitCode = printErr(err) + } + }() + otel.SetTracerProvider(tp) + tracer = tp.Tracer("Kubo-cli") + + stopFunc, err := profileIfEnabled() + if err != nil { + return printErr(err) + } + defer stopFunc() // to be executed as late as possible + + intrh, ctx := util.SetupInterruptHandler(ctx) + defer intrh.Close() + + // Handle `ipfs version` or `ipfs help` + if len(os.Args) > 1 { + // Handle `ipfs --version' + if os.Args[1] == "--version" { + os.Args[1] = "version" + } + + // Handle `ipfs help` and `ipfs help ` + if os.Args[1] == "help" { + if len(os.Args) > 2 { + os.Args = append(os.Args[:1], os.Args[2:]...) + // Handle `ipfs help --help` + // append `--help`,when the command is not `ipfs help --help` + if os.Args[1] != "--help" { + os.Args = append(os.Args, "--help") + } + } else { + os.Args[1] = "--help" + } + } + } else if insideGUI() { // if no args were passed, and we're in a GUI environment + // launch the daemon instead of launching a ghost window + os.Args = append(os.Args, "daemon", "--init") + } + + // output depends on executable name passed in os.Args + // so we need to make sure it's stable + os.Args[0] = "ipfs" + + err = cli.Run(ctx, Root, os.Args, os.Stdin, os.Stdout, os.Stderr, buildEnv, makeExecutor) + if err != nil { + return 1 + } + + // everything went better than expected :) + return 0 +} + +func insideGUI() bool { + return util.InsideGUI() +} + +func checkDebug(req *cmds.Request) { + // check if user wants to debug. option OR env var. + debug, _ := req.Options["debug"].(bool) + if debug || os.Getenv("IPFS_LOGGING") == "debug" { + u.Debug = true + logging.SetDebugLogging() + } + if u.GetenvBool("DEBUG") { + u.Debug = true + } +} + +func apiAddrOption(req *cmds.Request) (ma.Multiaddr, error) { + apiAddrStr, apiSpecified := req.Options[corecmds.ApiOption].(string) + if !apiSpecified { + return nil, nil + } + return ma.NewMultiaddr(apiAddrStr) +} + +func makeExecutor(req *cmds.Request, env interface{}) (cmds.Executor, error) { + exe := tracingWrappedExecutor{cmds.NewExecutor(req.Root)} + cctx := env.(*oldcmds.Context) + + // Check if the command is disabled. + if req.Command.NoLocal && req.Command.NoRemote { + return nil, fmt.Errorf("command disabled: %v", req.Path) + } + + // Can we just run this locally? + if !req.Command.NoLocal { + if doesNotUseRepo, ok := corecmds.GetDoesNotUseRepo(req.Command.Extra); doesNotUseRepo && ok { + return exe, nil + } + } + + // Get the API option from the commandline. + apiAddr, err := apiAddrOption(req) + if err != nil { + return nil, err + } + + // Require that the command be run on the daemon when the API flag is + // passed (unless we're trying to _run_ the daemon). + daemonRequested := apiAddr != nil && req.Command != daemonCmd + + // Run this on the client if required. + if req.Command.NoRemote { + if daemonRequested { + // User requested that the command be run on the daemon but we can't. + // NOTE: We drop this check for the `ipfs daemon` command. + return nil, errors.New("api flag specified but command cannot be run on the daemon") + } + return exe, nil + } + + // Finally, look in the repo for an API file. + if apiAddr == nil { + var err error + apiAddr, err = fsrepo.APIAddr(cctx.ConfigRoot) + switch err { + case nil, repo.ErrApiNotRunning: + default: + return nil, err + } + } + + // Still no api specified? Run it on the client or fail. + if apiAddr == nil { + if req.Command.NoLocal { + return nil, fmt.Errorf("command must be run on the daemon: %v", req.Path) + } + return exe, nil + } + + // Resolve the API addr. + apiAddr, err = resolveAddr(req.Context, apiAddr) + if err != nil { + return nil, err + } + network, host, err := manet.DialArgs(apiAddr) + if err != nil { + return nil, err + } + + // Construct the executor. + opts := []cmdhttp.ClientOpt{ + cmdhttp.ClientWithAPIPrefix(corehttp.APIPath), + } + + // Fallback on a local executor if we (a) have a repo and (b) aren't + // forcing a daemon. + if !daemonRequested && fsrepo.IsInitialized(cctx.ConfigRoot) { + opts = append(opts, cmdhttp.ClientWithFallback(exe)) + } + + var tpt http.RoundTripper + switch network { + case "tcp", "tcp4", "tcp6": + tpt = http.DefaultTransport + case "unix": + path := host + host = "unix" + tpt = &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", path) + }, + } + default: + return nil, fmt.Errorf("unsupported API address: %s", apiAddr) + } + opts = append(opts, cmdhttp.ClientWithHTTPClient(&http.Client{ + Transport: otelhttp.NewTransport(tpt, + otelhttp.WithPropagators(tracing.Propagator()), + ), + })) + + return tracingWrappedExecutor{cmdhttp.NewClient(host, opts...)}, nil +} + +type tracingWrappedExecutor struct { + exec cmds.Executor +} + +func (twe tracingWrappedExecutor) Execute(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error { + ctx, span := tracer.Start(req.Context, "cmds."+strings.Join(req.Path, "."), trace.WithAttributes(attribute.StringSlice("Arguments", req.Arguments))) + defer span.End() + req.Context = ctx + + err := twe.exec.Execute(req, re, env) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } + return err +} + +func GetRepoPath(req *cmds.Request) (string, error) { + repoOpt, found := req.Options[corecmds.RepoDirOption].(string) + if found && repoOpt != "" { + return repoOpt, nil + } + + repoPath, err := fsrepo.BestKnownPath() + if err != nil { + return "", err + } + return repoPath, nil +} + +// startProfiling begins CPU profiling and returns a `stop` function to be +// executed as late as possible. The stop function captures the memprofile. +func startProfiling() (func(), error) { + // start CPU profiling as early as possible + ofi, err := os.Create(cpuProfile) + if err != nil { + return nil, err + } + err = pprof.StartCPUProfile(ofi) + if err != nil { + ofi.Close() + return nil, err + } + go func() { + for range time.NewTicker(time.Second * 30).C { + err := writeHeapProfileToFile() + if err != nil { + log.Error(err) + } + } + }() + + stopProfiling := func() { + pprof.StopCPUProfile() + ofi.Close() // captured by the closure + } + return stopProfiling, nil +} + +func writeHeapProfileToFile() error { + mprof, err := os.Create(heapProfile) + if err != nil { + return err + } + defer mprof.Close() // _after_ writing the heap profile + return pprof.WriteHeapProfile(mprof) +} + +func profileIfEnabled() (func(), error) { + // FIXME this is a temporary hack so profiling of asynchronous operations + // works as intended. + if os.Getenv(EnvEnableProfiling) != "" { + stopProfilingFunc, err := startProfiling() // TODO maybe change this to its own option... profiling makes it slower. + if err != nil { + return nil, err + } + return stopProfilingFunc, nil + } + return func() {}, nil +} + +func resolveAddr(ctx context.Context, addr ma.Multiaddr) (ma.Multiaddr, error) { + ctx, cancelFunc := context.WithTimeout(ctx, 10*time.Second) + defer cancelFunc() + + addrs, err := dnsResolver.Resolve(ctx, addr) + if err != nil { + return nil, err + } + + if len(addrs) == 0 { + return nil, errors.New("non-resolvable API endpoint") + } + + return addrs[0], nil +} diff --git a/cmd/ipfs/main.go b/cmd/ipfs/main.go index 7beb4bed034..68785f432bd 100644 --- a/cmd/ipfs/main.go +++ b/cmd/ipfs/main.go @@ -1,80 +1,11 @@ -// cmd/ipfs implements the primary CLI binary for ipfs package main import ( - "context" - "errors" - "fmt" - "math/rand" - "net" - "net/http" "os" - "runtime/pprof" - "strings" - "time" - "github.com/google/uuid" - u "github.com/ipfs/boxo/util" - cmds "github.com/ipfs/go-ipfs-cmds" - "github.com/ipfs/go-ipfs-cmds/cli" - cmdhttp "github.com/ipfs/go-ipfs-cmds/http" - logging "github.com/ipfs/go-log" - "github.com/ipfs/kubo/cmd/ipfs/util" - oldcmds "github.com/ipfs/kubo/commands" - "github.com/ipfs/kubo/core" - corecmds "github.com/ipfs/kubo/core/commands" - "github.com/ipfs/kubo/core/corehttp" - "github.com/ipfs/kubo/plugin/loader" - "github.com/ipfs/kubo/repo" - "github.com/ipfs/kubo/repo/fsrepo" - "github.com/ipfs/kubo/tracing" - ma "github.com/multiformats/go-multiaddr" - madns "github.com/multiformats/go-multiaddr-dns" - manet "github.com/multiformats/go-multiaddr/net" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" + "github.com/ipfs/kubo/cmd/ipfs/kubo" ) -// log is the command logger -var log = logging.Logger("cmd/ipfs") -var tracer trace.Tracer - -// declared as a var for testing purposes -var dnsResolver = madns.DefaultResolver - -const ( - EnvEnableProfiling = "IPFS_PROF" - cpuProfile = "ipfs.cpuprof" - heapProfile = "ipfs.memprof" -) - -type PluginPreloader func(*loader.PluginLoader) error - -func LoadPlugins(repoPath string, preload PluginPreloader) (*loader.PluginLoader, error) { - plugins, err := loader.NewPluginLoader(repoPath) - if err != nil { - return nil, fmt.Errorf("error loading plugins: %s", err) - } - - if preload != nil { - if err := preload(plugins); err != nil { - return nil, fmt.Errorf("error loading plugins (preload): %s", err) - } - } - - if err := plugins.Initialize(); err != nil { - return nil, fmt.Errorf("error initializing plugins: %s", err) - } - - if err := plugins.Inject(); err != nil { - return nil, fmt.Errorf("error initializing plugins: %s", err) - } - return plugins, nil -} - // main roadmap: // - parse the commandline to get a cmdInvocation // - if user requests help, print it and exit. @@ -82,350 +13,9 @@ func LoadPlugins(repoPath string, preload PluginPreloader) (*loader.PluginLoader // - output the response // - if anything fails, print error, maybe with help func main() { - os.Exit(Start(BuildDefaultEnv)) -} - -func printErr(err error) int { - fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error()) - return 1 -} - -func newUUID(key string) logging.Metadata { - ids := "#UUID-ERROR#" - if id, err := uuid.NewRandom(); err == nil { - ids = id.String() - } - return logging.Metadata{ - key: ids, - } -} - -func BuildDefaultEnv(ctx context.Context, req *cmds.Request) (cmds.Environment, error) { - return BuildEnv(ctx, req, nil) -} - -func BuildEnv(ctx context.Context, req *cmds.Request, pl PluginPreloader) (cmds.Environment, error) { - checkDebug(req) - repoPath, err := GetRepoPath(req) - if err != nil { - return nil, err - } - log.Debugf("config path is %s", repoPath) - - plugins, err := LoadPlugins(repoPath, pl) - if err != nil { - return nil, err - } - - // this sets up the function that will initialize the node - // this is so that we can construct the node lazily. - return &oldcmds.Context{ - ConfigRoot: repoPath, - ReqLog: &oldcmds.ReqLog{}, - Plugins: plugins, - ConstructNode: func() (n *core.IpfsNode, err error) { - if req == nil { - return nil, errors.New("constructing node without a request") - } - - r, err := fsrepo.Open(repoPath) - if err != nil { // repo is owned by the node - return nil, err - } - - // ok everything is good. set it on the invocation (for ownership) - // and return it. - n, err = core.NewNode(ctx, &core.BuildCfg{ - Repo: r, - }) - if err != nil { - return nil, err - } - - return n, nil - }, - }, nil -} - -func Start(buildEnv func(ctx context.Context, req *cmds.Request) (cmds.Environment, error)) (exitCode int) { - rand.Seed(time.Now().UnixNano()) - ctx := logging.ContextWithLoggable(context.Background(), newUUID("session")) - - tp, err := tracing.NewTracerProvider(ctx) - if err != nil { - return printErr(err) - } - defer func() { - if err := tp.Shutdown(ctx); err != nil { - exitCode = printErr(err) - } - }() - otel.SetTracerProvider(tp) - tracer = tp.Tracer("Kubo-cli") - - stopFunc, err := profileIfEnabled() - if err != nil { - return printErr(err) - } - defer stopFunc() // to be executed as late as possible - - intrh, ctx := util.SetupInterruptHandler(ctx) - defer intrh.Close() - - // Handle `ipfs version` or `ipfs help` - if len(os.Args) > 1 { - // Handle `ipfs --version' - if os.Args[1] == "--version" { - os.Args[1] = "version" - } - - // Handle `ipfs help` and `ipfs help ` - if os.Args[1] == "help" { - if len(os.Args) > 2 { - os.Args = append(os.Args[:1], os.Args[2:]...) - // Handle `ipfs help --help` - // append `--help`,when the command is not `ipfs help --help` - if os.Args[1] != "--help" { - os.Args = append(os.Args, "--help") - } - } else { - os.Args[1] = "--help" - } - } - } else if insideGUI() { // if no args were passed, and we're in a GUI environment - // launch the daemon instead of launching a ghost window - os.Args = append(os.Args, "daemon", "--init") - } - - // output depends on executable name passed in os.Args - // so we need to make sure it's stable - os.Args[0] = "ipfs" - - err = cli.Run(ctx, Root, os.Args, os.Stdin, os.Stdout, os.Stderr, buildEnv, makeExecutor) - if err != nil { - return 1 - } - - // everything went better than expected :) - return 0 -} - -func insideGUI() bool { - return util.InsideGUI() + os.Exit(mainRet()) } -func checkDebug(req *cmds.Request) { - // check if user wants to debug. option OR env var. - debug, _ := req.Options["debug"].(bool) - if debug || os.Getenv("IPFS_LOGGING") == "debug" { - u.Debug = true - logging.SetDebugLogging() - } - if u.GetenvBool("DEBUG") { - u.Debug = true - } -} - -func apiAddrOption(req *cmds.Request) (ma.Multiaddr, error) { - apiAddrStr, apiSpecified := req.Options[corecmds.ApiOption].(string) - if !apiSpecified { - return nil, nil - } - return ma.NewMultiaddr(apiAddrStr) -} - -func makeExecutor(req *cmds.Request, env interface{}) (cmds.Executor, error) { - exe := tracingWrappedExecutor{cmds.NewExecutor(req.Root)} - cctx := env.(*oldcmds.Context) - - // Check if the command is disabled. - if req.Command.NoLocal && req.Command.NoRemote { - return nil, fmt.Errorf("command disabled: %v", req.Path) - } - - // Can we just run this locally? - if !req.Command.NoLocal { - if doesNotUseRepo, ok := corecmds.GetDoesNotUseRepo(req.Command.Extra); doesNotUseRepo && ok { - return exe, nil - } - } - - // Get the API option from the commandline. - apiAddr, err := apiAddrOption(req) - if err != nil { - return nil, err - } - - // Require that the command be run on the daemon when the API flag is - // passed (unless we're trying to _run_ the daemon). - daemonRequested := apiAddr != nil && req.Command != daemonCmd - - // Run this on the client if required. - if req.Command.NoRemote { - if daemonRequested { - // User requested that the command be run on the daemon but we can't. - // NOTE: We drop this check for the `ipfs daemon` command. - return nil, errors.New("api flag specified but command cannot be run on the daemon") - } - return exe, nil - } - - // Finally, look in the repo for an API file. - if apiAddr == nil { - var err error - apiAddr, err = fsrepo.APIAddr(cctx.ConfigRoot) - switch err { - case nil, repo.ErrApiNotRunning: - default: - return nil, err - } - } - - // Still no api specified? Run it on the client or fail. - if apiAddr == nil { - if req.Command.NoLocal { - return nil, fmt.Errorf("command must be run on the daemon: %v", req.Path) - } - return exe, nil - } - - // Resolve the API addr. - apiAddr, err = resolveAddr(req.Context, apiAddr) - if err != nil { - return nil, err - } - network, host, err := manet.DialArgs(apiAddr) - if err != nil { - return nil, err - } - - // Construct the executor. - opts := []cmdhttp.ClientOpt{ - cmdhttp.ClientWithAPIPrefix(corehttp.APIPath), - } - - // Fallback on a local executor if we (a) have a repo and (b) aren't - // forcing a daemon. - if !daemonRequested && fsrepo.IsInitialized(cctx.ConfigRoot) { - opts = append(opts, cmdhttp.ClientWithFallback(exe)) - } - - var tpt http.RoundTripper - switch network { - case "tcp", "tcp4", "tcp6": - tpt = http.DefaultTransport - case "unix": - path := host - host = "unix" - tpt = &http.Transport{ - DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { - return net.Dial("unix", path) - }, - } - default: - return nil, fmt.Errorf("unsupported API address: %s", apiAddr) - } - opts = append(opts, cmdhttp.ClientWithHTTPClient(&http.Client{ - Transport: otelhttp.NewTransport(tpt, - otelhttp.WithPropagators(tracing.Propagator()), - ), - })) - - return tracingWrappedExecutor{cmdhttp.NewClient(host, opts...)}, nil -} - -type tracingWrappedExecutor struct { - exec cmds.Executor -} - -func (twe tracingWrappedExecutor) Execute(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error { - ctx, span := tracer.Start(req.Context, "cmds."+strings.Join(req.Path, "."), trace.WithAttributes(attribute.StringSlice("Arguments", req.Arguments))) - defer span.End() - req.Context = ctx - - err := twe.exec.Execute(req, re, env) - if err != nil { - span.SetStatus(codes.Error, err.Error()) - } - return err -} - -func GetRepoPath(req *cmds.Request) (string, error) { - repoOpt, found := req.Options[corecmds.RepoDirOption].(string) - if found && repoOpt != "" { - return repoOpt, nil - } - - repoPath, err := fsrepo.BestKnownPath() - if err != nil { - return "", err - } - return repoPath, nil -} - -// startProfiling begins CPU profiling and returns a `stop` function to be -// executed as late as possible. The stop function captures the memprofile. -func startProfiling() (func(), error) { - // start CPU profiling as early as possible - ofi, err := os.Create(cpuProfile) - if err != nil { - return nil, err - } - err = pprof.StartCPUProfile(ofi) - if err != nil { - ofi.Close() - return nil, err - } - go func() { - for range time.NewTicker(time.Second * 30).C { - err := writeHeapProfileToFile() - if err != nil { - log.Error(err) - } - } - }() - - stopProfiling := func() { - pprof.StopCPUProfile() - ofi.Close() // captured by the closure - } - return stopProfiling, nil -} - -func writeHeapProfileToFile() error { - mprof, err := os.Create(heapProfile) - if err != nil { - return err - } - defer mprof.Close() // _after_ writing the heap profile - return pprof.WriteHeapProfile(mprof) -} - -func profileIfEnabled() (func(), error) { - // FIXME this is a temporary hack so profiling of asynchronous operations - // works as intended. - if os.Getenv(EnvEnableProfiling) != "" { - stopProfilingFunc, err := startProfiling() // TODO maybe change this to its own option... profiling makes it slower. - if err != nil { - return nil, err - } - return stopProfilingFunc, nil - } - return func() {}, nil -} - -func resolveAddr(ctx context.Context, addr ma.Multiaddr) (ma.Multiaddr, error) { - ctx, cancelFunc := context.WithTimeout(ctx, 10*time.Second) - defer cancelFunc() - - addrs, err := dnsResolver.Resolve(ctx, addr) - if err != nil { - return nil, err - } - - if len(addrs) == 0 { - return nil, errors.New("non-resolvable API endpoint") - } - - return addrs[0], nil +func mainRet() (exitCode int) { + return kubo.Start(kubo.BuildDefaultEnv) } diff --git a/cmd/ipfs/runmain_test.go b/cmd/ipfs/runmain_test.go index dda0113602c..d9187911207 100644 --- a/cmd/ipfs/runmain_test.go +++ b/cmd/ipfs/runmain_test.go @@ -16,7 +16,7 @@ import ( func TestRunMain(t *testing.T) { args := flag.Args() os.Args = append([]string{os.Args[0]}, args...) - ret := Start() + ret := mainRet() p := os.Getenv("IPFS_COVER_RET_FILE") if len(p) != 0 { From 429358ff3b5b231f7e5b84a0e5731ca91e839d10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 30 Mar 2023 19:36:33 +0200 Subject: [PATCH 3/3] feat: cmd/ipfs: Nicer to use BuildEnv --- cmd/ipfs/kubo/start.go | 76 ++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/cmd/ipfs/kubo/start.go b/cmd/ipfs/kubo/start.go index d14d5c98b2d..3dd59602d01 100644 --- a/cmd/ipfs/kubo/start.go +++ b/cmd/ipfs/kubo/start.go @@ -91,50 +91,52 @@ func newUUID(key string) logging.Metadata { } func BuildDefaultEnv(ctx context.Context, req *cmds.Request) (cmds.Environment, error) { - return BuildEnv(ctx, req, nil) + return BuildEnv(nil)(ctx, req) } -func BuildEnv(ctx context.Context, req *cmds.Request, pl PluginPreloader) (cmds.Environment, error) { - checkDebug(req) - repoPath, err := GetRepoPath(req) - if err != nil { - return nil, err - } - log.Debugf("config path is %s", repoPath) +func BuildEnv(pl PluginPreloader) func(ctx context.Context, req *cmds.Request) (cmds.Environment, error) { + return func(ctx context.Context, req *cmds.Request) (cmds.Environment, error) { + checkDebug(req) + repoPath, err := GetRepoPath(req) + if err != nil { + return nil, err + } + log.Debugf("config path is %s", repoPath) - plugins, err := LoadPlugins(repoPath, pl) - if err != nil { - return nil, err - } + plugins, err := LoadPlugins(repoPath, pl) + if err != nil { + return nil, err + } - // this sets up the function that will initialize the node - // this is so that we can construct the node lazily. - return &oldcmds.Context{ - ConfigRoot: repoPath, - ReqLog: &oldcmds.ReqLog{}, - Plugins: plugins, - ConstructNode: func() (n *core.IpfsNode, err error) { - if req == nil { - return nil, errors.New("constructing node without a request") - } + // this sets up the function that will initialize the node + // this is so that we can construct the node lazily. + return &oldcmds.Context{ + ConfigRoot: repoPath, + ReqLog: &oldcmds.ReqLog{}, + Plugins: plugins, + ConstructNode: func() (n *core.IpfsNode, err error) { + if req == nil { + return nil, errors.New("constructing node without a request") + } - r, err := fsrepo.Open(repoPath) - if err != nil { // repo is owned by the node - return nil, err - } + r, err := fsrepo.Open(repoPath) + if err != nil { // repo is owned by the node + return nil, err + } - // ok everything is good. set it on the invocation (for ownership) - // and return it. - n, err = core.NewNode(ctx, &core.BuildCfg{ - Repo: r, - }) - if err != nil { - return nil, err - } + // ok everything is good. set it on the invocation (for ownership) + // and return it. + n, err = core.NewNode(ctx, &core.BuildCfg{ + Repo: r, + }) + if err != nil { + return nil, err + } - return n, nil - }, - }, nil + return n, nil + }, + }, nil + } } func Start(buildEnv func(ctx context.Context, req *cmds.Request) (cmds.Environment, error)) (exitCode int) {