From 95425b79ae947706402a52a5a0f17e3cd5f62bc5 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 17 Dec 2019 20:30:21 +0300 Subject: [PATCH 01/12] Implement visor restart from hypervisor --- cmd/skywire-visor/commands/root.go | 11 ++- pkg/hypervisor/hypervisor.go | 13 +++ pkg/restart/restart.go | 139 +++++++++++++++++++++++++++++ pkg/router/routerclient/client.go | 2 +- pkg/visor/rpc.go | 16 ++++ pkg/visor/rpc_client.go | 17 +++- pkg/visor/visor.go | 22 ++--- 7 files changed, 201 insertions(+), 19 deletions(-) create mode 100644 pkg/restart/restart.go diff --git a/cmd/skywire-visor/commands/root.go b/cmd/skywire-visor/commands/root.go index 1b49da606..b0213cce7 100644 --- a/cmd/skywire-visor/commands/root.go +++ b/cmd/skywire-visor/commands/root.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/cobra" "github.com/SkycoinProject/skywire-mainnet/internal/utclient" + "github.com/SkycoinProject/skywire-mainnet/pkg/restart" "github.com/SkycoinProject/skywire-mainnet/pkg/util/pathutil" "github.com/SkycoinProject/skywire-mainnet/pkg/visor" ) @@ -46,6 +47,7 @@ type runCfg struct { masterLogger *logging.MasterLogger conf visor.Config node *visor.Node + restartCtx *restart.Context } var cfg *runCfg @@ -73,6 +75,13 @@ func init() { rootCmd.Flags().BoolVarP(&cfg.cfgFromStdin, "stdin", "i", false, "read config from STDIN") rootCmd.Flags().StringVarP(&cfg.profileMode, "profile", "p", "none", "enable profiling with pprof. Mode: none or one of: [cpu, mem, mutex, block, trace, http]") rootCmd.Flags().StringVarP(&cfg.port, "port", "", "6060", "port for http-mode of pprof") + + restartCtx, err := restart.CaptureContext() + if err != nil { + log.Printf("Failed to capture context: %v", err) + } else { + cfg.restartCtx = restartCtx + } } // Execute executes root CLI command. @@ -148,7 +157,7 @@ func (cfg *runCfg) readConfig() *runCfg { } func (cfg *runCfg) runNode() *runCfg { - node, err := visor.NewNode(&cfg.conf, cfg.masterLogger) + node, err := visor.NewNode(&cfg.conf, cfg.masterLogger, cfg.restartCtx) if err != nil { cfg.logger.Fatal("Failed to initialize node: ", err) } diff --git a/pkg/hypervisor/hypervisor.go b/pkg/hypervisor/hypervisor.go index 5df044407..c4170d32b 100644 --- a/pkg/hypervisor/hypervisor.go +++ b/pkg/hypervisor/hypervisor.go @@ -151,6 +151,7 @@ func (m *Node) ServeHTTP(w http.ResponseWriter, req *http.Request) { r.Put("/nodes/{pk}/routes/{rid}", m.putRoute()) r.Delete("/nodes/{pk}/routes/{rid}", m.deleteRoute()) r.Get("/nodes/{pk}/loops", m.getLoops()) + r.Get("/nodes/{pk}/restart", m.restart()) }) }) r.ServeHTTP(w, req) @@ -569,6 +570,18 @@ func (m *Node) getLoops() http.HandlerFunc { }) } +// NOTE: Reply comes with a delay, because of check if new executable is started successfully. +func (m *Node) restart() http.HandlerFunc { + return m.withCtx(m.nodeCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + if err := ctx.RPC.Restart(); err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + + httputil.WriteJSON(w, r, http.StatusOK, true) + }) +} + /* <<< Helper functions >>> */ diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go new file mode 100644 index 000000000..e7087d7f5 --- /dev/null +++ b/pkg/restart/restart.go @@ -0,0 +1,139 @@ +package restart + +import ( + "errors" + "log" + "os" + "os/exec" + "path/filepath" + "time" + + "github.com/SkycoinProject/skycoin/src/util/logging" +) + +var ( + // ErrMalformedArgs is returned when executable args are malformed. + ErrMalformedArgs = errors.New("malformed args") +) + +const defaultCheckDelay = 5 * time.Second + +// Context describes data required for restarting visor. +type Context struct { + log *logging.Logger + checkDelay time.Duration + workingDirectory string + args []string +} + +// CaptureContext captures data required for restarting visor. +// Data used by CaptureContext must not be modified before, +// therefore calling CaptureContext immediately after starting executable is recommended. +func CaptureContext() (*Context, error) { + wd, err := os.Getwd() + if err != nil { + return nil, err + } + + args := os.Args + + context := &Context{ + checkDelay: defaultCheckDelay, + workingDirectory: wd, + args: args, + } + + return context, nil +} + +// RegisterLogger registers a logger instead of standard one. +func (c *Context) RegisterLogger(logger *logging.Logger) { + c.log = logger +} + +// SetCheckDelay sets a check delay instead of standard one. +func (c *Context) SetCheckDelay(delay time.Duration) { + c.checkDelay = delay +} + +// Restart restarts executable using Context. +// Should not be called from a goroutine. +func (c *Context) Restart() error { + if len(c.args) == 0 { + return ErrMalformedArgs + } + + executableRelPath := c.args[0] + executableAbsPath := filepath.Join(c.workingDirectory, executableRelPath) + + c.infoLogger()("Starting new instance of executable (path: %q)", executableAbsPath) + + errCh := c.start(executableAbsPath) + + ticker := time.NewTicker(c.checkDelay) + defer ticker.Stop() + + select { + case err := <-errCh: + c.errorLogger()("Failed to start new instance: %v", err) + return err + case <-ticker.C: + c.infoLogger()("New instance started successfully, exiting") + os.Exit(0) + + // unreachable + return nil + } +} + +func (c *Context) start(path string) chan error { + errCh := make(chan error, 1) + + go func(path string) { + normalizedPath, err := exec.LookPath(path) + if err != nil { + errCh <- err + return + } + + if len(c.args) == 0 { + errCh <- ErrMalformedArgs + return + } + + args := c.args[1:] + cmd := exec.Command(normalizedPath, args...) + + if err := cmd.Start(); err != nil { + errCh <- err + return + } + + if err := cmd.Wait(); err != nil { + errCh <- err + return + } + }(path) + + return errCh +} + +func (c *Context) infoLogger() func(string, ...interface{}) { + if c.log != nil { + return c.log.Infof + } + + logger := log.New(os.Stdout, "[INFO] ", log.LstdFlags) + + return logger.Printf +} + +func (c *Context) errorLogger() func(string, ...interface{}) { + if c.log != nil { + return c.log.Errorf + } + + logger := log.New(os.Stdout, "[ERROR] ", log.LstdFlags) + + return logger.Printf +} diff --git a/pkg/router/routerclient/client.go b/pkg/router/routerclient/client.go index ced537a60..dc4c6b66c 100644 --- a/pkg/router/routerclient/client.go +++ b/pkg/router/routerclient/client.go @@ -15,7 +15,7 @@ const rpcName = "RPCGateway" // Client is an RPC client for router. type Client struct { - tr *dmsg.Transport + tr *dmsg.Stream rpc *rpc.Client } diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 10c5f55bf..97e7b6691 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -29,6 +29,9 @@ var ( // ErrNotFound is returned when a requested resource is not found. ErrNotFound = errors.New("not found") + + // ErrMalformedRestartContext is returned when restart context is malformed. + ErrMalformedRestartContext = errors.New("restart context is malformed") ) // RPC defines RPC methods for Node. @@ -390,3 +393,16 @@ func (r *RPC) Loops(_ *struct{}, out *[]LoopInfo) error { *out = loops return nil } + +/* + <<< VISOR MANAGEMENT >>> +*/ + +// Restart restarts visor. +func (r *RPC) Restart(_ *struct{}, _ *struct{}) error { + if r.node.restartCtx == nil { + return ErrMalformedRestartContext + } + + return r.node.restartCtx.Restart() +} diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index e470bd190..ea5e15c59 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -9,13 +9,12 @@ import ( "sync" "time" - "github.com/SkycoinProject/skywire-mainnet/pkg/app2" - "github.com/SkycoinProject/skywire-mainnet/pkg/router" - "github.com/SkycoinProject/dmsg/cipher" "github.com/SkycoinProject/skycoin/src/util/logging" "github.com/google/uuid" + "github.com/SkycoinProject/skywire-mainnet/pkg/app2" + "github.com/SkycoinProject/skywire-mainnet/pkg/router" "github.com/SkycoinProject/skywire-mainnet/pkg/routing" "github.com/SkycoinProject/skywire-mainnet/pkg/transport" ) @@ -49,6 +48,8 @@ type RPCClient interface { RemoveRoutingRule(key routing.RouteID) error Loops() ([]LoopInfo, error) + + Restart() error } // RPCClient provides methods to call an RPC Server. @@ -221,6 +222,11 @@ func (rc *rpcClient) Loops() ([]LoopInfo, error) { return loops, err } +// Restart calls Restart. +func (rc *rpcClient) Restart() error { + return rc.Call("Restart", &struct{}{}, &struct{}{}) +} + // MockRPCClient mocks RPCClient. type mockRPCClient struct { startedAt time.Time @@ -528,3 +534,8 @@ func (mc *mockRPCClient) Loops() ([]LoopInfo, error) { return loops, nil } + +// Restart implements RPCClient. +func (mc *mockRPCClient) Restart() error { + return nil +} diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 703ec9565..e88e288da 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -6,7 +6,6 @@ import ( "context" "errors" "fmt" - "io" "net" "net/rpc" "os" @@ -20,7 +19,6 @@ import ( "time" "github.com/SkycoinProject/dmsg" - "github.com/SkycoinProject/dmsg/cipher" "github.com/SkycoinProject/dmsg/noise" "github.com/SkycoinProject/skycoin/src/util/logging" @@ -28,6 +26,7 @@ import ( "github.com/SkycoinProject/skywire-mainnet/pkg/app2/appnet" "github.com/SkycoinProject/skywire-mainnet/pkg/app2/appserver" "github.com/SkycoinProject/skywire-mainnet/pkg/dmsgpty" + "github.com/SkycoinProject/skywire-mainnet/pkg/restart" "github.com/SkycoinProject/skywire-mainnet/pkg/routefinder/rfclient" "github.com/SkycoinProject/skywire-mainnet/pkg/router" "github.com/SkycoinProject/skywire-mainnet/pkg/routing" @@ -67,18 +66,11 @@ type AppState struct { Status AppStatus `json:"status"` } -// PacketRouter performs routing of the skywire packets. -type PacketRouter interface { - io.Closer - Serve(ctx context.Context) error - SetupIsTrusted(sPK cipher.PubKey) bool -} - // Node provides messaging runtime for Apps by setting up all // necessary connections and performing messaging gateway functions. type Node struct { conf *Config - router PacketRouter + router router.Router n *snet.Network tm *transport.Manager rt routing.Table @@ -91,7 +83,8 @@ type Node struct { localPath string appsConf []AppConfig - startedAt time.Time + startedAt time.Time + restartCtx *restart.Context pidMu sync.Mutex @@ -102,12 +95,13 @@ type Node struct { } // NewNode constructs new Node. -func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error) { +func NewNode(config *Config, masterLogger *logging.MasterLogger, restartCtx *restart.Context) (*Node, error) { ctx := context.Background() node := &Node{ conf: config, procManager: appserver.NewProcManager(logging.MustGetLogger("proc_manager")), + restartCtx: restartCtx, } node.Logger = masterLogger @@ -430,8 +424,8 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err erro appCfg := appcommon.Config{ Name: config.App, Version: config.Version, - SockFile: node.config.AppServerSockFile, - VisorPK: node.config.Node.StaticPubKey.Hex(), + SockFile: node.conf.AppServerSockFile, + VisorPK: node.conf.Node.StaticPubKey.Hex(), BinaryDir: node.appsPath, WorkDir: filepath.Join(node.localPath, config.App, fmt.Sprintf("v%s", config.Version)), } From f6a2c0e45f5ddaedadc5dbbf1f059d29bb31fb14 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 17 Dec 2019 21:56:39 +0300 Subject: [PATCH 02/12] Implement tests for visor restart from hypervisor --- pkg/restart/restart.go | 22 ++++++++--- pkg/restart/restart_test.go | 79 +++++++++++++++++++++++++++++++++++++ pkg/visor/config.go | 2 + pkg/visor/visor.go | 10 ++++- 4 files changed, 106 insertions(+), 7 deletions(-) create mode 100644 pkg/restart/restart_test.go diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go index e7087d7f5..b8013515c 100644 --- a/pkg/restart/restart.go +++ b/pkg/restart/restart.go @@ -16,7 +16,7 @@ var ( ErrMalformedArgs = errors.New("malformed args") ) -const defaultCheckDelay = 5 * time.Second +const DefaultCheckDelay = 5 * time.Second // Context describes data required for restarting visor. type Context struct { @@ -24,6 +24,7 @@ type Context struct { checkDelay time.Duration workingDirectory string args []string + needsExit bool // disable in (c *Context) Restart() tests } // CaptureContext captures data required for restarting visor. @@ -38,9 +39,10 @@ func CaptureContext() (*Context, error) { args := os.Args context := &Context{ - checkDelay: defaultCheckDelay, + checkDelay: DefaultCheckDelay, workingDirectory: wd, args: args, + needsExit: true, } return context, nil @@ -48,12 +50,16 @@ func CaptureContext() (*Context, error) { // RegisterLogger registers a logger instead of standard one. func (c *Context) RegisterLogger(logger *logging.Logger) { - c.log = logger + if c != nil { + c.log = logger + } } // SetCheckDelay sets a check delay instead of standard one. func (c *Context) SetCheckDelay(delay time.Duration) { - c.checkDelay = delay + if c != nil { + c.checkDelay = delay + } } // Restart restarts executable using Context. @@ -79,9 +85,11 @@ func (c *Context) Restart() error { return err case <-ticker.C: c.infoLogger()("New instance started successfully, exiting") - os.Exit(0) + if c.needsExit { + os.Exit(0) + } - // unreachable + // unreachable unless run in tests return nil } } @@ -90,6 +98,8 @@ func (c *Context) start(path string) chan error { errCh := make(chan error, 1) go func(path string) { + defer close(errCh) + normalizedPath, err := exec.LookPath(path) if err != nil { errCh <- err diff --git a/pkg/restart/restart_test.go b/pkg/restart/restart_test.go new file mode 100644 index 000000000..92d47227c --- /dev/null +++ b/pkg/restart/restart_test.go @@ -0,0 +1,79 @@ +package restart + +import ( + "os" + "testing" + "time" + + "github.com/SkycoinProject/skycoin/src/util/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCaptureContext(t *testing.T) { + cc, err := CaptureContext() + require.NoError(t, err) + + wd, err := os.Getwd() + assert.NoError(t, err) + + require.Equal(t, wd, cc.workingDirectory) + require.Equal(t, DefaultCheckDelay, cc.checkDelay) + require.Equal(t, os.Args, cc.args) + require.Nil(t, cc.log) + require.True(t, cc.needsExit) +} + +func TestContext_RegisterLogger(t *testing.T) { + cc, err := CaptureContext() + require.NoError(t, err) + require.Nil(t, cc.log) + + logger := logging.MustGetLogger("test") + cc.RegisterLogger(logger) + require.Equal(t, logger, cc.log) +} + +func TestContext_Restart(t *testing.T) { + cc, err := CaptureContext() + require.NoError(t, err) + assert.NotZero(t, len(cc.args)) + + cc.workingDirectory = "" + cc.needsExit = false + + t.Run("executable started", func(t *testing.T) { + cmd := "touch" + path := "/tmp/test_restart" + args := []string{cmd, path} + cc.args = args + + assert.NoError(t, cc.Restart()) + assert.NoError(t, os.Remove(path)) + }) + + t.Run("bad args", func(t *testing.T) { + cmd := "bad_command" + args := []string{cmd} + cc.args = args + + // TODO(nkryuchkov): Check if it works on Linux and Windows, if not then change the error text. + assert.EqualError(t, cc.Restart(), `exec: "bad_command": executable file not found in $PATH`) + }) + + t.Run("empty args", func(t *testing.T) { + cc.args = nil + + assert.Equal(t, ErrMalformedArgs, cc.Restart()) + }) +} + +func TestContext_SetCheckDelay(t *testing.T) { + cc, err := CaptureContext() + require.NoError(t, err) + require.Equal(t, DefaultCheckDelay, cc.checkDelay) + + const oneSecond = 1 * time.Second + cc.SetCheckDelay(oneSecond) + require.Equal(t, oneSecond, cc.checkDelay) +} diff --git a/pkg/visor/config.go b/pkg/visor/config.go index 69e54d808..c4c1bbbb3 100644 --- a/pkg/visor/config.go +++ b/pkg/visor/config.go @@ -73,6 +73,8 @@ type Config struct { Interfaces InterfaceConfig `json:"interfaces"` AppServerSockFile string `json:"app_server_sock_file"` + + RestartCheckDelay string `json:"restart_check_delay"` } // MessagingConfig returns config for dmsg client. diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index e88e288da..8916877c2 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -101,12 +101,20 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger, restartCtx *res node := &Node{ conf: config, procManager: appserver.NewProcManager(logging.MustGetLogger("proc_manager")), - restartCtx: restartCtx, } node.Logger = masterLogger node.logger = node.Logger.PackageLogger("skywire") + restartCheckDelay, err := time.ParseDuration(config.RestartCheckDelay) + if err == nil { + restartCtx.SetCheckDelay(restartCheckDelay) + } + + restartCtx.RegisterLogger(node.logger) + + node.restartCtx = restartCtx + pk := config.Node.StaticPubKey sk := config.Node.StaticSecKey From 01ae1ba33f26f7f48418601529778bbcd91847de Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 17 Dec 2019 21:59:38 +0300 Subject: [PATCH 03/12] Make minor linter improvements --- pkg/restart/restart.go | 1 + pkg/restart/restart_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go index b8013515c..18be8a302 100644 --- a/pkg/restart/restart.go +++ b/pkg/restart/restart.go @@ -85,6 +85,7 @@ func (c *Context) Restart() error { return err case <-ticker.C: c.infoLogger()("New instance started successfully, exiting") + if c.needsExit { os.Exit(0) } diff --git a/pkg/restart/restart_test.go b/pkg/restart/restart_test.go index 92d47227c..80e291c92 100644 --- a/pkg/restart/restart_test.go +++ b/pkg/restart/restart_test.go @@ -74,6 +74,7 @@ func TestContext_SetCheckDelay(t *testing.T) { require.Equal(t, DefaultCheckDelay, cc.checkDelay) const oneSecond = 1 * time.Second + cc.SetCheckDelay(oneSecond) require.Equal(t, oneSecond, cc.checkDelay) } From 3a73eea9785a5b5839eb19f4f7a44d8d0c150b05 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 23 Dec 2019 16:11:04 +0400 Subject: [PATCH 04/12] Fix linter errors --- pkg/restart/restart.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go index 18be8a302..00837ce48 100644 --- a/pkg/restart/restart.go +++ b/pkg/restart/restart.go @@ -16,6 +16,7 @@ var ( ErrMalformedArgs = errors.New("malformed args") ) +// DefaultCheckDelay is a default delay for checking if a new instance is started successfully. const DefaultCheckDelay = 5 * time.Second // Context describes data required for restarting visor. @@ -113,7 +114,7 @@ func (c *Context) start(path string) chan error { } args := c.args[1:] - cmd := exec.Command(normalizedPath, args...) + cmd := exec.Command(normalizedPath, args...) // nolint:gosec if err := cmd.Start(); err != nil { errCh <- err From b1caac53d4a8d4a927bca3042266d3a82b0b0bb8 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 24 Dec 2019 16:06:10 +0400 Subject: [PATCH 05/12] Forbid simultaneous restarts --- pkg/restart/restart.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go index 00837ce48..3a452047b 100644 --- a/pkg/restart/restart.go +++ b/pkg/restart/restart.go @@ -6,14 +6,17 @@ import ( "os" "os/exec" "path/filepath" + "sync/atomic" "time" - "github.com/SkycoinProject/skycoin/src/util/logging" + "github.com/sirupsen/logrus" ) var ( // ErrMalformedArgs is returned when executable args are malformed. ErrMalformedArgs = errors.New("malformed args") + // ErrAlreadyRestarting is returned on restarting attempt when restarting is in progress. + ErrAlreadyRestarting = errors.New("already restarting") ) // DefaultCheckDelay is a default delay for checking if a new instance is started successfully. @@ -21,11 +24,12 @@ const DefaultCheckDelay = 5 * time.Second // Context describes data required for restarting visor. type Context struct { - log *logging.Logger + log logrus.FieldLogger checkDelay time.Duration workingDirectory string args []string needsExit bool // disable in (c *Context) Restart() tests + isRestarting int32 } // CaptureContext captures data required for restarting visor. @@ -50,7 +54,7 @@ func CaptureContext() (*Context, error) { } // RegisterLogger registers a logger instead of standard one. -func (c *Context) RegisterLogger(logger *logging.Logger) { +func (c *Context) RegisterLogger(logger logrus.FieldLogger) { if c != nil { c.log = logger } @@ -64,8 +68,13 @@ func (c *Context) SetCheckDelay(delay time.Duration) { } // Restart restarts executable using Context. -// Should not be called from a goroutine. func (c *Context) Restart() error { + if atomic.CompareAndSwapInt32(&c.isRestarting, 0, 1) { + return ErrAlreadyRestarting + } + + defer atomic.StoreInt32(&c.isRestarting, 0) + if len(c.args) == 0 { return ErrMalformedArgs } From a9264013b99bdf66c5b0482609d090c1ca63f474 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 24 Dec 2019 16:45:42 +0400 Subject: [PATCH 06/12] Fix restart bug --- pkg/restart/restart.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go index 3a452047b..edeb68818 100644 --- a/pkg/restart/restart.go +++ b/pkg/restart/restart.go @@ -69,7 +69,7 @@ func (c *Context) SetCheckDelay(delay time.Duration) { // Restart restarts executable using Context. func (c *Context) Restart() error { - if atomic.CompareAndSwapInt32(&c.isRestarting, 0, 1) { + if !atomic.CompareAndSwapInt32(&c.isRestarting, 0, 1) { return ErrAlreadyRestarting } From b8e2cd0c9481abd59700565678948b466ed1e505 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 24 Dec 2019 16:47:08 +0400 Subject: [PATCH 07/12] Add "already restarting" test case --- pkg/restart/restart_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pkg/restart/restart_test.go b/pkg/restart/restart_test.go index 80e291c92..e0c118add 100644 --- a/pkg/restart/restart_test.go +++ b/pkg/restart/restart_test.go @@ -66,6 +66,25 @@ func TestContext_Restart(t *testing.T) { assert.Equal(t, ErrMalformedArgs, cc.Restart()) }) + + t.Run("already restarting", func(t *testing.T) { + cc.args = nil + + cmd := "touch" + path := "/tmp/test_restart" + args := []string{cmd, path} + cc.args = args + + ch := make(chan error, 1) + go func() { + ch <- cc.Restart() + }() + + assert.NoError(t, cc.Restart()) + assert.NoError(t, os.Remove(path)) + + assert.Equal(t, ErrAlreadyRestarting, <-ch) + }) } func TestContext_SetCheckDelay(t *testing.T) { From d01737637cb21d00c343ff957b0dc8e3dabc84ce Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 26 Dec 2019 22:10:23 +0400 Subject: [PATCH 08/12] Fix bugs & make different improvements --- cmd/skywire-visor/commands/root.go | 16 ++++++++ pkg/restart/restart.go | 61 +++++++++++++++++++++--------- pkg/restart/restart_test.go | 16 ++++---- pkg/visor/rpc.go | 16 +++++++- pkg/visor/visor.go | 1 + 5 files changed, 82 insertions(+), 28 deletions(-) diff --git a/cmd/skywire-visor/commands/root.go b/cmd/skywire-visor/commands/root.go index b0213cce7..9eb51f5e7 100644 --- a/cmd/skywire-visor/commands/root.go +++ b/cmd/skywire-visor/commands/root.go @@ -40,6 +40,7 @@ type runCfg struct { cfgFromStdin bool profileMode string port string + startDelay string args []string profileStop func() @@ -75,6 +76,7 @@ func init() { rootCmd.Flags().BoolVarP(&cfg.cfgFromStdin, "stdin", "i", false, "read config from STDIN") rootCmd.Flags().StringVarP(&cfg.profileMode, "profile", "p", "none", "enable profiling with pprof. Mode: none or one of: [cpu, mem, mutex, block, trace, http]") rootCmd.Flags().StringVarP(&cfg.port, "port", "", "6060", "port for http-mode of pprof") + rootCmd.Flags().StringVarP(&cfg.startDelay, "delay", "", "0ns", "delay before visor start") restartCtx, err := restart.CaptureContext() if err != nil { @@ -157,6 +159,18 @@ func (cfg *runCfg) readConfig() *runCfg { } func (cfg *runCfg) runNode() *runCfg { + startDelay, err := time.ParseDuration(cfg.startDelay) + if err != nil { + cfg.logger.Warnf("Using no visor start delay due to parsing failure: %v", err) + startDelay = time.Duration(0) + } + + if startDelay != 0 { + cfg.logger.Infof("Visor start delay is %v, waiting...", startDelay) + } + + time.Sleep(startDelay) + node, err := visor.NewNode(&cfg.conf, cfg.masterLogger, cfg.restartCtx) if err != nil { cfg.logger.Fatal("Failed to initialize node: ", err) @@ -190,7 +204,9 @@ func (cfg *runCfg) runNode() *runCfg { if cfg.conf.ShutdownTimeout == 0 { cfg.conf.ShutdownTimeout = defaultShutdownTimeout } + cfg.node = node + return cfg } diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go index edeb68818..b26016c6c 100644 --- a/pkg/restart/restart.go +++ b/pkg/restart/restart.go @@ -20,7 +20,7 @@ var ( ) // DefaultCheckDelay is a default delay for checking if a new instance is started successfully. -const DefaultCheckDelay = 5 * time.Second +const DefaultCheckDelay = 1 * time.Second // Context describes data required for restarting visor. type Context struct { @@ -28,8 +28,8 @@ type Context struct { checkDelay time.Duration workingDirectory string args []string - needsExit bool // disable in (c *Context) Restart() tests isRestarting int32 + appendDelay bool // disabled in tests } // CaptureContext captures data required for restarting visor. @@ -47,7 +47,7 @@ func CaptureContext() (*Context, error) { checkDelay: DefaultCheckDelay, workingDirectory: wd, args: args, - needsExit: true, + appendDelay: true, } return context, nil @@ -67,8 +67,8 @@ func (c *Context) SetCheckDelay(delay time.Duration) { } } -// Restart restarts executable using Context. -func (c *Context) Restart() error { +// Start starts a new executable using Context. +func (c *Context) Start() error { if !atomic.CompareAndSwapInt32(&c.isRestarting, 0, 1) { return ErrAlreadyRestarting } @@ -79,12 +79,12 @@ func (c *Context) Restart() error { return ErrMalformedArgs } - executableRelPath := c.args[0] - executableAbsPath := filepath.Join(c.workingDirectory, executableRelPath) - - c.infoLogger()("Starting new instance of executable (path: %q)", executableAbsPath) + execPath := c.args[0] + if !filepath.IsAbs(execPath) { + execPath = filepath.Join(c.workingDirectory, execPath) + } - errCh := c.start(executableAbsPath) + errCh := c.startExec(execPath) ticker := time.NewTicker(c.checkDelay) defer ticker.Stop() @@ -95,17 +95,11 @@ func (c *Context) Restart() error { return err case <-ticker.C: c.infoLogger()("New instance started successfully, exiting") - - if c.needsExit { - os.Exit(0) - } - - // unreachable unless run in tests return nil } } -func (c *Context) start(path string) chan error { +func (c *Context) startExec(path string) chan error { errCh := make(chan error, 1) go func(path string) { @@ -122,9 +116,16 @@ func (c *Context) start(path string) chan error { return } - args := c.args[1:] + args := c.startArgs() cmd := exec.Command(normalizedPath, args...) // nolint:gosec + cmd.Stdout = os.Stdout + cmd.Stdin = os.Stdin + cmd.Stderr = os.Stderr + cmd.Env = os.Environ() + + c.infoLogger()("Starting new instance of executable (path: %q, args: %q)", path, args) + if err := cmd.Start(); err != nil { errCh <- err return @@ -139,6 +140,30 @@ func (c *Context) start(path string) chan error { return errCh } +const extraWaitingTime = 1 * time.Second + +func (c *Context) startArgs() []string { + args := c.args[1:] + + const delayArgName = "--delay" + + l := len(args) + for i := 0; i < l; i++ { + if args[i] == delayArgName && i < len(args)-1 { + args = append(args[:i], args[i+2:]...) + i-- + l -= 2 + } + } + + if c.appendDelay { + delay := c.checkDelay + extraWaitingTime + args = append(args, delayArgName, delay.String()) + } + + return args +} + func (c *Context) infoLogger() func(string, ...interface{}) { if c.log != nil { return c.log.Infof diff --git a/pkg/restart/restart_test.go b/pkg/restart/restart_test.go index e0c118add..29cbb6f96 100644 --- a/pkg/restart/restart_test.go +++ b/pkg/restart/restart_test.go @@ -21,7 +21,6 @@ func TestCaptureContext(t *testing.T) { require.Equal(t, DefaultCheckDelay, cc.checkDelay) require.Equal(t, os.Args, cc.args) require.Nil(t, cc.log) - require.True(t, cc.needsExit) } func TestContext_RegisterLogger(t *testing.T) { @@ -34,21 +33,21 @@ func TestContext_RegisterLogger(t *testing.T) { require.Equal(t, logger, cc.log) } -func TestContext_Restart(t *testing.T) { +func TestContext_Start(t *testing.T) { cc, err := CaptureContext() require.NoError(t, err) assert.NotZero(t, len(cc.args)) cc.workingDirectory = "" - cc.needsExit = false t.Run("executable started", func(t *testing.T) { cmd := "touch" path := "/tmp/test_restart" args := []string{cmd, path} cc.args = args + cc.appendDelay = false - assert.NoError(t, cc.Restart()) + assert.NoError(t, cc.Start()) assert.NoError(t, os.Remove(path)) }) @@ -58,13 +57,13 @@ func TestContext_Restart(t *testing.T) { cc.args = args // TODO(nkryuchkov): Check if it works on Linux and Windows, if not then change the error text. - assert.EqualError(t, cc.Restart(), `exec: "bad_command": executable file not found in $PATH`) + assert.EqualError(t, cc.Start(), `exec: "bad_command": executable file not found in $PATH`) }) t.Run("empty args", func(t *testing.T) { cc.args = nil - assert.Equal(t, ErrMalformedArgs, cc.Restart()) + assert.Equal(t, ErrMalformedArgs, cc.Start()) }) t.Run("already restarting", func(t *testing.T) { @@ -74,13 +73,14 @@ func TestContext_Restart(t *testing.T) { path := "/tmp/test_restart" args := []string{cmd, path} cc.args = args + cc.appendDelay = false ch := make(chan error, 1) go func() { - ch <- cc.Restart() + ch <- cc.Start() }() - assert.NoError(t, cc.Restart()) + assert.NoError(t, cc.Start()) assert.NoError(t, os.Remove(path)) assert.Equal(t, ErrAlreadyRestarting, <-ch) diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 9f733e182..7ddd95834 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -4,6 +4,7 @@ import ( "context" "errors" "net/http" + "os" "path/filepath" "time" @@ -398,11 +399,22 @@ func (r *RPC) Loops(_ *struct{}, out *[]LoopInfo) error { <<< VISOR MANAGEMENT >>> */ +const exitDelay = 100 * time.Millisecond + // Restart restarts visor. -func (r *RPC) Restart(_ *struct{}, _ *struct{}) error { +func (r *RPC) Restart(_ *struct{}, _ *struct{}) (err error) { + defer func() { + if err == nil { + go func() { + time.Sleep(exitDelay) + os.Exit(0) + }() + } + }() + if r.node.restartCtx == nil { return ErrMalformedRestartContext } - return r.node.restartCtx.Restart() + return r.node.restartCtx.Start() } diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index c316a5152..5992f12fc 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -84,6 +84,7 @@ type Node struct { appsConf []AppConfig startedAt time.Time + startDelay time.Duration restartCtx *restart.Context pidMu sync.Mutex From a754316e4b702af4a8889ec8dd500c3aad2d0639 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 26 Dec 2019 22:35:44 +0400 Subject: [PATCH 09/12] Simplify restart logic --- cmd/skywire-visor/commands/root.go | 7 +- pkg/restart/restart.go | 113 ++++++++++------------------- pkg/restart/restart_test.go | 48 ++++-------- 3 files changed, 57 insertions(+), 111 deletions(-) diff --git a/cmd/skywire-visor/commands/root.go b/cmd/skywire-visor/commands/root.go index 9eb51f5e7..ed47be729 100644 --- a/cmd/skywire-visor/commands/root.go +++ b/cmd/skywire-visor/commands/root.go @@ -78,12 +78,7 @@ func init() { rootCmd.Flags().StringVarP(&cfg.port, "port", "", "6060", "port for http-mode of pprof") rootCmd.Flags().StringVarP(&cfg.startDelay, "delay", "", "0ns", "delay before visor start") - restartCtx, err := restart.CaptureContext() - if err != nil { - log.Printf("Failed to capture context: %v", err) - } else { - cfg.restartCtx = restartCtx - } + cfg.restartCtx = restart.CaptureContext() } // Execute executes root CLI command. diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go index b26016c6c..1313097ec 100644 --- a/pkg/restart/restart.go +++ b/pkg/restart/restart.go @@ -5,7 +5,6 @@ import ( "log" "os" "os/exec" - "path/filepath" "sync/atomic" "time" @@ -13,44 +12,42 @@ import ( ) var ( - // ErrMalformedArgs is returned when executable args are malformed. - ErrMalformedArgs = errors.New("malformed args") - // ErrAlreadyRestarting is returned on restarting attempt when restarting is in progress. - ErrAlreadyRestarting = errors.New("already restarting") + // ErrAlreadyStarting is returned on starting attempt when starting is in progress. + ErrAlreadyStarting = errors.New("already starting") ) -// DefaultCheckDelay is a default delay for checking if a new instance is started successfully. -const DefaultCheckDelay = 1 * time.Second +const ( + // DefaultCheckDelay is a default delay for checking if a new instance is started successfully. + DefaultCheckDelay = 1 * time.Second + extraWaitingTime = 1 * time.Second + delayArgName = "--delay" +) // Context describes data required for restarting visor. type Context struct { - log logrus.FieldLogger - checkDelay time.Duration - workingDirectory string - args []string - isRestarting int32 - appendDelay bool // disabled in tests + log logrus.FieldLogger + isStarting int32 + checkDelay time.Duration + appendDelay bool // disabled in tests + cmd *exec.Cmd } // CaptureContext captures data required for restarting visor. // Data used by CaptureContext must not be modified before, // therefore calling CaptureContext immediately after starting executable is recommended. -func CaptureContext() (*Context, error) { - wd, err := os.Getwd() - if err != nil { - return nil, err +func CaptureContext() *Context { + cmd := exec.Command(os.Args[0], os.Args[1:]...) + + cmd.Stdout = os.Stdout + cmd.Stdin = os.Stdin + cmd.Stderr = os.Stderr + cmd.Env = os.Environ() + + return &Context{ + cmd: cmd, + checkDelay: DefaultCheckDelay, + appendDelay: true, } - - args := os.Args - - context := &Context{ - checkDelay: DefaultCheckDelay, - workingDirectory: wd, - args: args, - appendDelay: true, - } - - return context, nil } // RegisterLogger registers a logger instead of standard one. @@ -69,22 +66,13 @@ func (c *Context) SetCheckDelay(delay time.Duration) { // Start starts a new executable using Context. func (c *Context) Start() error { - if !atomic.CompareAndSwapInt32(&c.isRestarting, 0, 1) { - return ErrAlreadyRestarting + if !atomic.CompareAndSwapInt32(&c.isStarting, 0, 1) { + return ErrAlreadyStarting } - defer atomic.StoreInt32(&c.isRestarting, 0) + defer atomic.StoreInt32(&c.isStarting, 0) - if len(c.args) == 0 { - return ErrMalformedArgs - } - - execPath := c.args[0] - if !filepath.IsAbs(execPath) { - execPath = filepath.Join(c.workingDirectory, execPath) - } - - errCh := c.startExec(execPath) + errCh := c.startExec() ticker := time.NewTicker(c.checkDelay) defer ticker.Stop() @@ -94,58 +82,37 @@ func (c *Context) Start() error { c.errorLogger()("Failed to start new instance: %v", err) return err case <-ticker.C: - c.infoLogger()("New instance started successfully, exiting") + c.infoLogger()("New instance started successfully, exiting from the old one") return nil } } -func (c *Context) startExec(path string) chan error { +func (c *Context) startExec() chan error { errCh := make(chan error, 1) - go func(path string) { + go func() { defer close(errCh) - normalizedPath, err := exec.LookPath(path) - if err != nil { - errCh <- err - return - } - - if len(c.args) == 0 { - errCh <- ErrMalformedArgs - return - } + c.adjustArgs() - args := c.startArgs() - cmd := exec.Command(normalizedPath, args...) // nolint:gosec + c.infoLogger()("Starting new instance of executable (args: %q)", c.cmd.Args) - cmd.Stdout = os.Stdout - cmd.Stdin = os.Stdin - cmd.Stderr = os.Stderr - cmd.Env = os.Environ() - - c.infoLogger()("Starting new instance of executable (path: %q, args: %q)", path, args) - - if err := cmd.Start(); err != nil { + if err := c.cmd.Start(); err != nil { errCh <- err return } - if err := cmd.Wait(); err != nil { + if err := c.cmd.Wait(); err != nil { errCh <- err return } - }(path) + }() return errCh } -const extraWaitingTime = 1 * time.Second - -func (c *Context) startArgs() []string { - args := c.args[1:] - - const delayArgName = "--delay" +func (c *Context) adjustArgs() { + args := c.cmd.Args l := len(args) for i := 0; i < l; i++ { @@ -161,7 +128,7 @@ func (c *Context) startArgs() []string { args = append(args, delayArgName, delay.String()) } - return args + c.cmd.Args = args } func (c *Context) infoLogger() func(string, ...interface{}) { diff --git a/pkg/restart/restart_test.go b/pkg/restart/restart_test.go index 29cbb6f96..e75c88079 100644 --- a/pkg/restart/restart_test.go +++ b/pkg/restart/restart_test.go @@ -2,6 +2,7 @@ package restart import ( "os" + "os/exec" "testing" "time" @@ -11,21 +12,19 @@ import ( ) func TestCaptureContext(t *testing.T) { - cc, err := CaptureContext() - require.NoError(t, err) + cc := CaptureContext() - wd, err := os.Getwd() - assert.NoError(t, err) - - require.Equal(t, wd, cc.workingDirectory) require.Equal(t, DefaultCheckDelay, cc.checkDelay) - require.Equal(t, os.Args, cc.args) + require.Equal(t, os.Args, cc.cmd.Args) + require.Equal(t, os.Stdout, cc.cmd.Stdout) + require.Equal(t, os.Stdin, cc.cmd.Stdin) + require.Equal(t, os.Stderr, cc.cmd.Stderr) + require.Equal(t, os.Environ(), cc.cmd.Env) require.Nil(t, cc.log) } func TestContext_RegisterLogger(t *testing.T) { - cc, err := CaptureContext() - require.NoError(t, err) + cc := CaptureContext() require.Nil(t, cc.log) logger := logging.MustGetLogger("test") @@ -34,17 +33,13 @@ func TestContext_RegisterLogger(t *testing.T) { } func TestContext_Start(t *testing.T) { - cc, err := CaptureContext() - require.NoError(t, err) - assert.NotZero(t, len(cc.args)) - - cc.workingDirectory = "" + cc := CaptureContext() + assert.NotZero(t, len(cc.cmd.Args)) t.Run("executable started", func(t *testing.T) { cmd := "touch" path := "/tmp/test_restart" - args := []string{cmd, path} - cc.args = args + cc.cmd = exec.Command(cmd, path) cc.appendDelay = false assert.NoError(t, cc.Start()) @@ -53,26 +48,16 @@ func TestContext_Start(t *testing.T) { t.Run("bad args", func(t *testing.T) { cmd := "bad_command" - args := []string{cmd} - cc.args = args + cc.cmd = exec.Command(cmd) // TODO(nkryuchkov): Check if it works on Linux and Windows, if not then change the error text. assert.EqualError(t, cc.Start(), `exec: "bad_command": executable file not found in $PATH`) }) - t.Run("empty args", func(t *testing.T) { - cc.args = nil - - assert.Equal(t, ErrMalformedArgs, cc.Start()) - }) - t.Run("already restarting", func(t *testing.T) { - cc.args = nil - cmd := "touch" path := "/tmp/test_restart" - args := []string{cmd, path} - cc.args = args + cc.cmd = exec.Command(cmd, path) cc.appendDelay = false ch := make(chan error, 1) @@ -81,15 +66,14 @@ func TestContext_Start(t *testing.T) { }() assert.NoError(t, cc.Start()) - assert.NoError(t, os.Remove(path)) + assert.Equal(t, ErrAlreadyStarting, <-ch) - assert.Equal(t, ErrAlreadyRestarting, <-ch) + assert.NoError(t, os.Remove(path)) }) } func TestContext_SetCheckDelay(t *testing.T) { - cc, err := CaptureContext() - require.NoError(t, err) + cc := CaptureContext() require.Equal(t, DefaultCheckDelay, cc.checkDelay) const oneSecond = 1 * time.Second From 2756eccb54b0e25f157164b73eb39667a24bf87e Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 26 Dec 2019 22:37:31 +0400 Subject: [PATCH 10/12] Fix linter errors --- pkg/restart/restart.go | 2 +- pkg/visor/visor.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go index 1313097ec..f108cc16d 100644 --- a/pkg/restart/restart.go +++ b/pkg/restart/restart.go @@ -36,7 +36,7 @@ type Context struct { // Data used by CaptureContext must not be modified before, // therefore calling CaptureContext immediately after starting executable is recommended. func CaptureContext() *Context { - cmd := exec.Command(os.Args[0], os.Args[1:]...) + cmd := exec.Command(os.Args[0], os.Args[1:]...) // nolint:gosec cmd.Stdout = os.Stdout cmd.Stdin = os.Stdin diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 5992f12fc..c316a5152 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -84,7 +84,6 @@ type Node struct { appsConf []AppConfig startedAt time.Time - startDelay time.Duration restartCtx *restart.Context pidMu sync.Mutex From 0dc3a763ad3ae9a445e4c0c0d1cf757278f3c891 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 26 Dec 2019 22:43:39 +0400 Subject: [PATCH 11/12] Make minor linter improvements --- pkg/restart/restart.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go index f108cc16d..0c485ebb8 100644 --- a/pkg/restart/restart.go +++ b/pkg/restart/restart.go @@ -26,10 +26,10 @@ const ( // Context describes data required for restarting visor. type Context struct { log logrus.FieldLogger - isStarting int32 + cmd *exec.Cmd checkDelay time.Duration + isStarting int32 appendDelay bool // disabled in tests - cmd *exec.Cmd } // CaptureContext captures data required for restarting visor. @@ -114,12 +114,15 @@ func (c *Context) startExec() chan error { func (c *Context) adjustArgs() { args := c.cmd.Args + i := 0 l := len(args) - for i := 0; i < l; i++ { + + for i < l { if args[i] == delayArgName && i < len(args)-1 { args = append(args[:i], args[i+2:]...) - i-- l -= 2 + } else { + i++ } } From df0797f8d59f6f4d108d65ff9b9f2fb959a6e9bc Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 26 Dec 2019 23:14:13 +0400 Subject: [PATCH 12/12] Fix linter errors --- pkg/restart/restart_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/restart/restart_test.go b/pkg/restart/restart_test.go index e75c88079..bb9825349 100644 --- a/pkg/restart/restart_test.go +++ b/pkg/restart/restart_test.go @@ -39,7 +39,7 @@ func TestContext_Start(t *testing.T) { t.Run("executable started", func(t *testing.T) { cmd := "touch" path := "/tmp/test_restart" - cc.cmd = exec.Command(cmd, path) + cc.cmd = exec.Command(cmd, path) // nolint:gosec cc.appendDelay = false assert.NoError(t, cc.Start()) @@ -48,7 +48,7 @@ func TestContext_Start(t *testing.T) { t.Run("bad args", func(t *testing.T) { cmd := "bad_command" - cc.cmd = exec.Command(cmd) + cc.cmd = exec.Command(cmd) // nolint:gosec // TODO(nkryuchkov): Check if it works on Linux and Windows, if not then change the error text. assert.EqualError(t, cc.Start(), `exec: "bad_command": executable file not found in $PATH`) @@ -57,7 +57,7 @@ func TestContext_Start(t *testing.T) { t.Run("already restarting", func(t *testing.T) { cmd := "touch" path := "/tmp/test_restart" - cc.cmd = exec.Command(cmd, path) + cc.cmd = exec.Command(cmd, path) // nolint:gosec cc.appendDelay = false ch := make(chan error, 1)