diff --git a/api/allocations.go b/api/allocations.go index 80987fb4b451..109174f07529 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -79,6 +79,16 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error { return err } +func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOptions) error { + req := AllocationRestartRequest{ + TaskName: taskName, + } + + var resp struct{} + _, err := a.client.putQuery("/v1/client/allocation/"+alloc.ID+"/restart", &req, &resp, q) + return err +} + // Allocation is used for serialization of allocations. type Allocation struct { ID string @@ -246,6 +256,10 @@ func (a Allocation) RescheduleInfo(t time.Time) (int, int) { return attempted, availableAttempts } +type AllocationRestartRequest struct { + TaskName string +} + // RescheduleTracker encapsulates previous reschedule events type RescheduleTracker struct { Events []*RescheduleEvent diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index 6b1e4eec09a4..11568988f27c 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -48,6 +48,19 @@ func (a *Allocations) GarbageCollect(args *nstructs.AllocSpecificRequest, reply return nil } +// Restart is used to trigger a restart of an allocation or a subtask on a client. +func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstructs.GenericResponse) error { + defer metrics.MeasureSince([]string{"client", "allocations", "restart"}, time.Now()) + + if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) { + return nstructs.ErrPermissionDenied + } + + return a.c.RestartAllocation(args.AllocID, args.TaskName) +} + // Stats is used to collect allocation statistics func (a *Allocations) Stats(args *cstructs.AllocStatsRequest, reply *cstructs.AllocStatsResponse) error { defer metrics.MeasureSince([]string{"client", "allocations", "stats"}, time.Now()) diff --git a/client/alloc_endpoint_test.go b/client/alloc_endpoint_test.go index 7c3bec1dc61e..8eee0b2f13fb 100644 --- a/client/alloc_endpoint_test.go +++ b/client/alloc_endpoint_test.go @@ -2,6 +2,7 @@ package client import ( "fmt" + "strings" "testing" "github.com/hashicorp/nomad/acl" @@ -13,6 +14,102 @@ import ( "github.com/stretchr/testify/require" ) +func TestAllocations_Restart(t *testing.T) { + t.Parallel() + require := require.New(t) + client, cleanup := TestClient(t, nil) + defer cleanup() + + a := mock.Alloc() + a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + a.Job.TaskGroups[0].RestartPolicy = &nstructs.RestartPolicy{ + Attempts: 0, + Mode: nstructs.RestartPolicyModeFail, + } + a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10ms", + } + require.Nil(client.addAlloc(a, "")) + + // Try with bad alloc + req := &nstructs.AllocRestartRequest{} + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp) + require.Error(err) + + // Try with good alloc + req.AllocID = a.ID + + testutil.WaitForResult(func() (bool, error) { + var resp2 nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp2) + if err != nil && strings.Contains(err.Error(), "not running") { + return false, err + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + +func TestAllocations_Restart_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + server, addr, root := testACLServer(t, nil) + defer server.Shutdown() + + client, cleanup := TestClient(t, func(c *config.Config) { + c.Servers = []string{addr} + c.ACLEnabled = true + }) + defer cleanup() + + // Try request without a token and expect failure + { + req := &nstructs.AllocRestartRequest{} + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp) + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with an invalid token and expect failure + { + token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{})) + req := &nstructs.AllocRestartRequest{} + req.AuthToken = token.SecretID + + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp) + + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with a valid token + { + policyHCL := mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle}) + token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", policyHCL) + require.NotNil(token) + req := &nstructs.AllocRestartRequest{} + req.AuthToken = token.SecretID + req.Namespace = nstructs.DefaultNamespace + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp) + require.True(nstructs.IsErrUnknownAllocation(err), "Expected unknown alloc, found: %v", err) + } + + // Try request with a management token + { + req := &nstructs.AllocRestartRequest{} + req.AuthToken = root.SecretID + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp) + require.True(nstructs.IsErrUnknownAllocation(err), "Expected unknown alloc, found: %v", err) + } +} + func TestAllocations_GarbageCollectAll(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 83a40227290c..7e61d299da67 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -8,6 +8,7 @@ import ( "time" log "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunner/state" @@ -936,3 +937,29 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH } return nil } + +// RestartTask signalls the task runner for the provided task to restart. +func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error { + tr, ok := ar.tasks[taskName] + if !ok { + return fmt.Errorf("Could not find task runner for task: %s", taskName) + } + + return tr.Restart(context.TODO(), taskEvent, false) +} + +// RestartAll signalls all task runners in the allocation to restart and passes +// a copy of the task event to each restart event. +// Returns any errors in a concatenated form. +func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error { + var err *multierror.Error + + for tn := range ar.tasks { + rerr := ar.RestartTask(tn, taskEvent.Copy()) + if rerr != nil { + err = multierror.Append(err, rerr) + } + } + + return err.ErrorOrNil() +} diff --git a/client/client.go b/client/client.go index f50e4a0a8997..d2c12b05dafe 100644 --- a/client/client.go +++ b/client/client.go @@ -124,6 +124,9 @@ type AllocRunner interface { DestroyCh() <-chan struct{} ShutdownCh() <-chan struct{} GetTaskEventHandler(taskName string) drivermanager.EventHandler + + RestartTask(taskName string, taskEvent *structs.TaskEvent) error + RestartAll(taskEvent *structs.TaskEvent) error } // Client is used to implement the client interaction with Nomad. Clients @@ -703,6 +706,22 @@ func (c *Client) CollectAllAllocs() { c.garbageCollector.CollectAll() } +func (c *Client) RestartAllocation(allocID, taskName string) error { + ar, err := c.getAllocRunner(allocID) + if err != nil { + return err + } + + event := structs.NewTaskEvent(structs.TaskRestartSignal). + SetRestartReason("User requested restart") + + if taskName != "" { + return ar.RestartTask(taskName, event) + } + + return ar.RestartAll(event) +} + // Node returns the locally registered node func (c *Client) Node() *structs.Node { c.configLock.RLock() diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 422f8906cb09..aab13c54583f 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -1,6 +1,7 @@ package agent import ( + "encoding/json" "fmt" "net/http" "strings" @@ -96,8 +97,9 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ if s.agent.client == nil { return nil, clientNotRunning } - return s.allocSnapshot(allocID, resp, req) + case "restart": + return s.allocRestart(allocID, resp, req) case "gc": return s.allocGC(allocID, resp, req) } @@ -140,6 +142,51 @@ func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request return nil, rpcErr } +func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Build the request and parse the ACL token + args := structs.AllocRestartRequest{ + AllocID: allocID, + TaskName: "", + } + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + + // Explicitly parse the body separately to disallow overriding AllocID in req Body. + var reqBody struct { + TaskName string + } + err := json.NewDecoder(req.Body).Decode(&reqBody) + if err != nil { + return nil, err + } + if reqBody.TaskName != "" { + args.TaskName = reqBody.TaskName + } + + // Determine the handler to use + useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID) + + // Make the RPC + var reply structs.GenericResponse + var rpcErr error + if useLocalClient { + rpcErr = s.agent.Client().ClientRPC("Allocations.Restart", &args, &reply) + } else if useClientRPC { + rpcErr = s.agent.Client().RPC("ClientAllocations.Restart", &args, &reply) + } else if useServerRPC { + rpcErr = s.agent.Server().RPC("ClientAllocations.Restart", &args, &reply) + } else { + rpcErr = CodedError(400, "No local Node and node_id not provided") + } + + if rpcErr != nil { + if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) { + rpcErr = CodedError(404, rpcErr.Error()) + } + } + + return reply, rpcErr +} + func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Build the request and parse the ACL token args := structs.AllocSpecificRequest{ diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 1b6008cecd2f..457f6d9ee4a7 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -261,6 +261,139 @@ func TestHTTP_AllocQuery_Payload(t *testing.T) { }) } +func TestHTTP_AllocRestart(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Validates that all methods of forwarding the request are processed correctly + httpTest(t, nil, func(s *TestAgent) { + // Local node, local resp + { + // Make the HTTP request + buf := encodeReq(map[string]string{}) + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/restart", uuid.Generate()), buf) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + _, err = s.Server.ClientAllocRequest(respW, req) + require.NotNil(err) + require.True(structs.IsErrUnknownAllocation(err)) + } + + // Local node, server resp + { + srv := s.server + s.server = nil + + buf := encodeReq(map[string]string{}) + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/restart", uuid.Generate()), buf) + require.Nil(err) + + respW := httptest.NewRecorder() + _, err = s.Server.ClientAllocRequest(respW, req) + require.NotNil(err) + require.True(structs.IsErrUnknownAllocation(err)) + + s.server = srv + } + + // no client, server resp + { + c := s.client + s.client = nil + + testutil.WaitForResult(func() (bool, error) { + n, err := s.server.State().NodeByID(nil, c.NodeID()) + if err != nil { + return false, err + } + return n != nil, nil + }, func(err error) { + t.Fatalf("should have client: %v", err) + }) + + buf := encodeReq(map[string]string{}) + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/restart", uuid.Generate()), buf) + require.Nil(err) + + respW := httptest.NewRecorder() + _, err = s.Server.ClientAllocRequest(respW, req) + require.NotNil(err) + require.True(structs.IsErrUnknownAllocation(err)) + + s.client = c + } + }) +} + +func TestHTTP_AllocRestart_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + httpACLTest(t, nil, func(s *TestAgent) { + state := s.Agent.server.State() + + // If there's no token, we expect the request to fail. + { + buf := encodeReq(map[string]string{}) + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/restart", uuid.Generate()), buf) + require.NoError(err) + + respW := httptest.NewRecorder() + _, err = s.Server.ClientAllocRequest(respW, req) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try request with an invalid token and expect it to fail + { + buf := encodeReq(map[string]string{}) + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/restart", uuid.Generate()), buf) + require.NoError(err) + + respW := httptest.NewRecorder() + token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyWrite)) + setToken(req, token) + _, err = s.Server.ClientAllocRequest(respW, req) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try request with a valid token + // Still returns an error because the alloc does not exist + { + buf := encodeReq(map[string]string{}) + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/restart", uuid.Generate()), buf) + require.NoError(err) + + respW := httptest.NewRecorder() + policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle}) + token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy) + setToken(req, token) + _, err = s.Server.ClientAllocRequest(respW, req) + require.NotNil(err) + require.True(structs.IsErrUnknownAllocation(err)) + } + + // Try request with a management token + // Still returns an error because the alloc does not exist + { + buf := encodeReq(map[string]string{}) + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/allocation/%s/restart", uuid.Generate()), buf) + require.NoError(err) + + respW := httptest.NewRecorder() + setToken(req, s.RootToken) + _, err = s.Server.ClientAllocRequest(respW, req) + require.NotNil(err) + require.True(structs.IsErrUnknownAllocation(err)) + } + }) +} + func TestHTTP_AllocStats(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/command/alloc_restart.go b/command/alloc_restart.go new file mode 100644 index 000000000000..68b2f5637a66 --- /dev/null +++ b/command/alloc_restart.go @@ -0,0 +1,168 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" +) + +type AllocRestartCommand struct { + Meta +} + +func (a *AllocRestartCommand) Help() string { + helpText := ` +Usage: nomad alloc restart [options] + + restart an existing allocation. This command is used to restart a specific alloc + and its tasks. If no task is provided then all of the allocation's tasks will + be restarted. + +General Options: + + ` + generalOptionsUsage() + ` + +Restart Specific Options: + + -verbose + Show full information. +` + return strings.TrimSpace(helpText) +} + +func (c *AllocRestartCommand) Name() string { return "alloc restart" } + +func (c *AllocRestartCommand) Run(args []string) int { + var verbose bool + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&verbose, "verbose", false, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Check that we got exactly one alloc + args = flags.Args() + if len(args) < 1 || len(args) > 2 { + c.Ui.Error("This command takes one or two arguments: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + allocID := args[0] + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + + // Query the allocation info + if len(allocID) == 1 { + c.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + + allocID = sanitizeUUIDPrefix(allocID) + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + + if len(allocs) == 0 { + c.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + + if len(allocs) > 1 { + // Format the allocs + out := formatAllocListStubs(allocs, verbose, length) + c.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out)) + return 1 + } + + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 + } + + var taskName string + if len(args) == 2 { + // Validate Task + taskName = args[1] + err := validateTaskExistsInAllocation(taskName, alloc) + if err != nil { + c.Ui.Error(err.Error()) + return 1 + } + } + + err = client.Allocations().Restart(alloc, taskName, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to restart allocation:\n\n%s", err.Error())) + return 1 + } + + return 0 +} + +func validateTaskExistsInAllocation(taskName string, alloc *api.Allocation) error { + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil { + return fmt.Errorf("Could not find allocation task group: %s", alloc.TaskGroup) + } + + taskExists := false + foundTaskNames := make([]string, len(tg.Tasks)) + for i, task := range tg.Tasks { + foundTaskNames[i] = task.Name + if task.Name == taskName { + taskExists = true + break + } + } + + if !taskExists { + return fmt.Errorf("Could not find task named: %s, found:\n%s", taskName, formatList(foundTaskNames)) + } + + return nil +} + +func (a *AllocRestartCommand) Synopsis() string { + return "Restart a running allocation" +} + +func (c *AllocRestartCommand) AutocompleteArgs() complete.Predictor { + // Here we attempt to autocomplete allocations for any position of arg. + // We should eventually try to auto complete the task name if the arg is + // at position 2. + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Allocs, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Allocs] + }) +} diff --git a/command/alloc_restart_test.go b/command/alloc_restart_test.go new file mode 100644 index 000000000000..6c6f2a28960f --- /dev/null +++ b/command/alloc_restart_test.go @@ -0,0 +1,175 @@ +package command + +import ( + "fmt" + "testing" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/cli" + "github.com/posener/complete" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAllocRestartCommand_Implements(t *testing.T) { + var _ cli.Command = &AllocRestartCommand{} +} + +func TestAllocRestartCommand_Fails(t *testing.T) { + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + require := require.New(t) + ui := new(cli.MockUi) + cmd := &AllocRestartCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + require.Equal(cmd.Run([]string{"some", "garbage", "args"}), 1, "Expected failure") + require.Contains(ui.ErrorWriter.String(), commandErrorText(cmd), "Expected help output") + ui.ErrorWriter.Reset() + + // Fails on connection failure + require.Equal(cmd.Run([]string{"-address=nope", "foobar"}), 1, "expected failure") + require.Contains(ui.ErrorWriter.String(), "Error querying allocation") + ui.ErrorWriter.Reset() + + // Fails on missing alloc + require.Equal(cmd.Run([]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C"}), 1) + require.Contains(ui.ErrorWriter.String(), "No allocation(s) with prefix or id") + ui.ErrorWriter.Reset() + + // Fail on identifier with too few characters + require.Equal(cmd.Run([]string{"-address=" + url, "2"}), 1) + require.Contains(ui.ErrorWriter.String(), "must contain at least two characters") + ui.ErrorWriter.Reset() + + // Identifiers with uneven length should produce a query result + require.Equal(cmd.Run([]string{"-address=" + url, "123"}), 1) + require.Contains(ui.ErrorWriter.String(), "No allocation(s) with prefix or id") + ui.ErrorWriter.Reset() + + // Wait for a node to be ready + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + for _, node := range nodes { + if _, ok := node.Drivers["mock_driver"]; ok && + node.Status == structs.NodeStatusReady { + return true, nil + } + } + return false, fmt.Errorf("no ready nodes") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + jobID := "job1_sfx" + job1 := testJob(jobID) + resp, _, err := client.Jobs().Register(job1, nil) + require.NoError(err) + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + // get an alloc id + allocId1 := "" + if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { + if len(allocs) > 0 { + allocId1 = allocs[0].ID + } + } + require.NotEmpty(allocId1, "unable to find allocation") + + // Fails on not found task + require.Equal(cmd.Run([]string{"-address=" + url, allocId1, "fooooobarrr"}), 1) + require.Contains(ui.ErrorWriter.String(), "Could not find task named") + ui.ErrorWriter.Reset() +} + +func TestAllocRestartCommand_Run(t *testing.T) { + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + require := require.New(t) + + // Wait for a node to be ready + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + for _, node := range nodes { + if _, ok := node.Drivers["mock_driver"]; ok && + node.Status == structs.NodeStatusReady { + return true, nil + } + } + return false, fmt.Errorf("no ready nodes") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + ui := new(cli.MockUi) + cmd := &AllocRestartCommand{Meta: Meta{Ui: ui}} + + jobID := "job1_sfx" + job1 := testJob(jobID) + resp, _, err := client.Jobs().Register(job1, nil) + require.NoError(err) + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + // get an alloc id + allocId1 := "" + if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { + if len(allocs) > 0 { + allocId1 = allocs[0].ID + } + } + require.NotEmpty(allocId1, "unable to find allocation") + + // Wait for alloc to be running + testutil.WaitForResult(func() (bool, error) { + alloc, _, err := client.Allocations().Info(allocId1, nil) + if err != nil { + return false, err + } + if alloc.ClientStatus == api.AllocClientStatusRunning { + return true, nil + } + return false, fmt.Errorf("alloc is not running, is: %s", alloc.ClientStatus) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + require.Equal(cmd.Run([]string{"-address=" + url, allocId1}), 0, "expected successful exit code") + + ui.OutputWriter.Reset() +} + +func TestAllocRestartCommand_AutocompleteArgs(t *testing.T) { + assert := assert.New(t) + + srv, _, url := testServer(t, true, nil) + defer srv.Shutdown() + + ui := new(cli.MockUi) + cmd := &AllocRestartCommand{Meta: Meta{Ui: ui, flagAddress: url}} + + // Create a fake alloc + state := srv.Agent.Server().State() + a := mock.Alloc() + assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a})) + + prefix := a.ID[:5] + args := complete.Args{Last: prefix} + predictor := cmd.AutocompleteArgs() + + res := predictor.Predict(args) + assert.Equal(1, len(res)) + assert.Equal(a.ID, res[0]) +} diff --git a/command/commands.go b/command/commands.go index ab2173904e60..29be6200bdf9 100644 --- a/command/commands.go +++ b/command/commands.go @@ -150,6 +150,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "alloc restart": func() (cli.Command, error) { + return &AllocRestartCommand{ + Meta: meta, + }, nil + }, "alloc status": func() (cli.Command, error) { return &AllocStatusCommand{ Meta: meta, diff --git a/nomad/client_alloc_endpoint.go b/nomad/client_alloc_endpoint.go index f682a159571a..83c1a0747775 100644 --- a/nomad/client_alloc_endpoint.go +++ b/nomad/client_alloc_endpoint.go @@ -22,7 +22,7 @@ type ClientAllocations struct { // GarbageCollectAll is used to garbage collect all allocations on a client. func (a *ClientAllocations) GarbageCollectAll(args *structs.NodeSpecificRequest, reply *structs.GenericResponse) error { // We only allow stale reads since the only potentially stale information is - // the Node registration and the cost is fairly high for adding another hope + // the Node registration and the cost is fairly high for adding another hop // in the forwarding chain. args.QueryOptions.AllowStale = true @@ -68,7 +68,7 @@ func (a *ClientAllocations) GarbageCollectAll(args *structs.NodeSpecificRequest, // GarbageCollect is used to garbage collect an allocation on a client. func (a *ClientAllocations) GarbageCollect(args *structs.AllocSpecificRequest, reply *structs.GenericResponse) error { // We only allow stale reads since the only potentially stale information is - // the Node registration and the cost is fairly high for adding another hope + // the Node registration and the cost is fairly high for adding another hop // in the forwarding chain. args.QueryOptions.AllowStale = true @@ -121,10 +121,65 @@ func (a *ClientAllocations) GarbageCollect(args *structs.AllocSpecificRequest, r return NodeRpc(state.Session, "Allocations.GarbageCollect", args, reply) } +// Restart is used to trigger a restart of an allocation or a subtask on a client. +func (a *ClientAllocations) Restart(args *structs.AllocRestartRequest, reply *structs.GenericResponse) error { + // We only allow stale reads since the only potentially stale information is + // the Node registration and the cost is fairly high for adding another hop + // in the forwarding chain. + args.QueryOptions.AllowStale = true + + // Potentially forward to a different region. + if done, err := a.srv.forward("ClientAllocations.Restart", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client_allocations", "restart"}, time.Now()) + + if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) { + return structs.ErrPermissionDenied + } + + // Verify the arguments. + if args.AllocID == "" { + return errors.New("missing AllocID") + } + + // Find the allocation + snap, err := a.srv.State().Snapshot() + if err != nil { + return err + } + + alloc, err := snap.AllocByID(nil, args.AllocID) + if err != nil { + return err + } + + if alloc == nil { + return structs.NewErrUnknownAllocation(args.AllocID) + } + + // Make sure Node is valid and new enough to support RPC + _, err = getNodeForRpc(snap, alloc.NodeID) + if err != nil { + return err + } + + // Get the connection to the client + state, ok := a.srv.getNodeConn(alloc.NodeID) + if !ok { + return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.Restart", args, reply) + } + + // Make the RPC + return NodeRpc(state.Session, "Allocations.Restart", args, reply) +} + // Stats is used to collect allocation statistics func (a *ClientAllocations) Stats(args *cstructs.AllocStatsRequest, reply *cstructs.AllocStatsResponse) error { // We only allow stale reads since the only potentially stale information is - // the Node registration and the cost is fairly high for adding another hope + // the Node registration and the cost is fairly high for adding another hop // in the forwarding chain. args.QueryOptions.AllowStale = true diff --git a/nomad/client_alloc_endpoint_test.go b/nomad/client_alloc_endpoint_test.go index 44bf6b2b6fad..669000fa0f62 100644 --- a/nomad/client_alloc_endpoint_test.go +++ b/nomad/client_alloc_endpoint_test.go @@ -785,3 +785,258 @@ func TestClientAllocations_Stats_Remote(t *testing.T) { require.Nil(err) require.NotNil(resp.Stats) } + +func TestClientAllocations_Restart_Local(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s := TestServer(t, nil) + defer s.Shutdown() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + c, cleanup := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s.config.RPCAddr.String()} + c.GCDiskUsageThreshold = 100.0 + }) + defer cleanup() + + // Force an allocation onto the node + a := mock.Alloc() + a.Job.Type = structs.JobTypeService + a.NodeID = c.NodeID() + a.Job.TaskGroups[0].Count = 1 + a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ + Name: "web", + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": "10s", + }, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + } + + testutil.WaitForResult(func() (bool, error) { + nodes := s.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + t.Fatalf("should have a client") + }) + + // Upsert the allocation + state := s.State() + require.Nil(state.UpsertJob(999, a.Job)) + require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) + + // Wait for the client to run the allocation + testutil.WaitForResult(func() (bool, error) { + alloc, err := state.AllocByID(nil, a.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + if alloc.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) + } + + return true, nil + }, func(err error) { + t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err) + }) + + // Make the request without having an alloc id + req := &structs.AllocRestartRequest{ + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Fetch the response + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.Restart", req, &resp) + require.NotNil(err) + require.Contains(err.Error(), "missing") + + // Fetch the response setting the alloc id - This should not error becuase the + // alloc is running. + req.AllocID = a.ID + var resp2 structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.Restart", req, &resp2) + require.Nil(err) + + testutil.WaitForResult(func() (bool, error) { + alloc, err := state.AllocByID(nil, a.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + + taskState := alloc.TaskStates["web"] + if taskState == nil { + return false, fmt.Errorf("could not find task state") + } + + if taskState.Restarts != 1 { + return false, fmt.Errorf("expected task 'web' to have 1 restart, got: %d", taskState.Restarts) + } + + return true, nil + }, func(err error) { + t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err) + }) +} + +func TestClientAllocations_Restart_Remote(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s1 := TestServer(t, nil) + defer s1.Shutdown() + s2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + codec := rpcClient(t, s2) + + c, cleanup := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s2.config.RPCAddr.String()} + }) + defer cleanup() + + // Force an allocation onto the node + a := mock.Alloc() + a.Job.Type = structs.JobTypeService + a.NodeID = c.NodeID() + a.Job.TaskGroups[0].Count = 1 + a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ + Name: "web", + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": "10s", + }, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + } + + testutil.WaitForResult(func() (bool, error) { + nodes := s2.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + t.Fatalf("should have a client") + }) + + // Upsert the allocation + state1 := s1.State() + state2 := s2.State() + require.Nil(state1.UpsertJob(999, a.Job)) + require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a})) + require.Nil(state2.UpsertJob(999, a.Job)) + require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a})) + + // Wait for the client to run the allocation + testutil.WaitForResult(func() (bool, error) { + alloc, err := state2.AllocByID(nil, a.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + if alloc.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) + } + + return true, nil + }, func(err error) { + t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err) + }) + + // Make the request without having an alloc id + req := &structs.AllocRestartRequest{ + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Fetch the response + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.Restart", req, &resp) + require.NotNil(err) + require.Contains(err.Error(), "missing") + + // Fetch the response setting the alloc id - This should succeed because the + // alloc is running + req.AllocID = a.ID + var resp2 structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.Restart", req, &resp2) + require.Nil(err) +} + +func TestClientAllocations_Restart_ACL(t *testing.T) { + // Start a server + s, root := TestACLServer(t, nil) + defer s.Shutdown() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Create a bad token + policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + + policyGood := mock.NamespacePolicy(structs.DefaultNamespace, acl.PolicyWrite, nil) + tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood) + + cases := []struct { + Name string + Token string + ExpectedError string + }{ + { + Name: "bad token", + Token: tokenBad.SecretID, + ExpectedError: structs.ErrPermissionDenied.Error(), + }, + { + Name: "good token", + Token: tokenGood.SecretID, + ExpectedError: "Unknown alloc", + }, + { + Name: "root token", + Token: root.SecretID, + ExpectedError: "Unknown alloc", + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + + // Make the request without having a node-id + req := &structs.AllocRestartRequest{ + AllocID: uuid.Generate(), + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + AuthToken: c.Token, + Region: "global", + }, + } + + // Fetch the response + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.Restart", req, &resp) + require.NotNil(t, err) + require.Contains(t, err.Error(), c.ExpectedError) + }) + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5e40bf73a580..43dd3c438224 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -717,6 +717,14 @@ type AllocsGetRequest struct { QueryOptions } +// AllocRestartRequest is used to restart a specific allocations tasks. +type AllocRestartRequest struct { + AllocID string + TaskName string + + QueryOptions +} + // PeriodicForceRequest is used to force a specific periodic job. type PeriodicForceRequest struct { JobID string