diff --git a/api/allocations.go b/api/allocations.go index 1376aee0299d..7ca28d759002 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -4,6 +4,8 @@ import ( "fmt" "sort" "time" + + "github.com/hashicorp/nomad/nomad/structs" ) var ( @@ -103,6 +105,23 @@ type AllocStopResponse struct { WriteMeta } +func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal string) error { + nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q) + if err != nil { + return err + } + + req := structs.AllocSignalRequest{ + AllocID: alloc.ID, + Signal: signal, + Task: task, + } + + var resp structs.GenericResponse + _, err = nodeClient.putQuery("/v1/client/allocation/"+alloc.ID+"/signal", &req, &resp, q) + return err +} + // Allocation is used for serialization of allocations. type Allocation struct { ID string diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index 11568988f27c..c141f0e36cf4 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -48,6 +48,20 @@ func (a *Allocations) GarbageCollect(args *nstructs.AllocSpecificRequest, reply return nil } +// Signal is used to send a signal to an allocation's tasks on a client. +func (a *Allocations) Signal(args *nstructs.AllocSignalRequest, reply *nstructs.GenericResponse) error { + defer metrics.MeasureSince([]string{"client", "allocations", "signal"}, time.Now()) + + // Check alloc-lifecycle permissions + if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) { + return nstructs.ErrPermissionDenied + } + + return a.c.SignalAllocation(args.AllocID, args.Task, args.Signal) +} + // Restart is used to trigger a restart of an allocation or a subtask on a client. func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstructs.GenericResponse) error { defer metrics.MeasureSince([]string{"client", "allocations", "restart"}, time.Now()) diff --git a/client/alloc_endpoint_test.go b/client/alloc_endpoint_test.go index 8eee0b2f13fb..a29d60eeeccc 100644 --- a/client/alloc_endpoint_test.go +++ b/client/alloc_endpoint_test.go @@ -272,6 +272,90 @@ func TestAllocations_GarbageCollect_ACL(t *testing.T) { } } +func TestAllocations_Signal(t *testing.T) { + t.Parallel() + + client, cleanup := TestClient(t, nil) + defer cleanup() + + a := mock.Alloc() + require.Nil(t, client.addAlloc(a, "")) + + // Try with bad alloc + req := &nstructs.AllocSignalRequest{} + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Signal", &req, &resp) + require.NotNil(t, err) + require.True(t, nstructs.IsErrUnknownAllocation(err)) + + // Try with good alloc + req.AllocID = a.ID + + var resp2 nstructs.GenericResponse + err = client.ClientRPC("Allocations.Signal", &req, &resp2) + + require.Error(t, err, "Expected error, got: %s, resp: %#+v", err, resp2) + require.Equal(t, "1 error(s) occurred:\n\n* Failed to signal task: web, err: Task not running", err.Error()) +} + +func TestAllocations_Signal_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + server, addr, root := testACLServer(t, nil) + defer server.Shutdown() + + client, cleanup := TestClient(t, func(c *config.Config) { + c.Servers = []string{addr} + c.ACLEnabled = true + }) + defer cleanup() + + // Try request without a token and expect failure + { + req := &nstructs.AllocSignalRequest{} + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Signal", &req, &resp) + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with an invalid token and expect failure + { + token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) + req := &nstructs.AllocSignalRequest{} + req.AuthToken = token.SecretID + + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Signal", &req, &resp) + + require.NotNil(err) + require.EqualError(err, nstructs.ErrPermissionDenied.Error()) + } + + // Try request with a valid token + { + token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid", + mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle})) + req := &nstructs.AllocSignalRequest{} + req.AuthToken = token.SecretID + req.Namespace = nstructs.DefaultNamespace + + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Signal", &req, &resp) + require.True(nstructs.IsErrUnknownAllocation(err)) + } + + // Try request with a management token + { + req := &nstructs.AllocSignalRequest{} + req.AuthToken = root.SecretID + + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Signal", &req, &resp) + require.True(nstructs.IsErrUnknownAllocation(err)) + } +} + func TestAllocations_Stats(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7e61d299da67..2b5041abe69e 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -963,3 +963,29 @@ func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error { return err.ErrorOrNil() } + +// Signal sends a signal request to task runners inside an allocation. If the +// taskName is empty, then it is sent to all tasks. +func (ar *allocRunner) Signal(taskName, signal string) error { + event := structs.NewTaskEvent(structs.TaskSignaling).SetSignalText(signal) + + if taskName != "" { + tr, ok := ar.tasks[taskName] + if !ok { + return fmt.Errorf("Task not found") + } + + return tr.Signal(event, signal) + } + + var err *multierror.Error + + for tn, tr := range ar.tasks { + rerr := tr.Signal(event.Copy(), signal) + if rerr != nil { + err = multierror.Append(err, fmt.Errorf("Failed to signal task: %s, err: %v", tn, rerr)) + } + } + + return err.ErrorOrNil() +} diff --git a/client/client.go b/client/client.go index 643cfec3cbcb..daf82862c8b2 100644 --- a/client/client.go +++ b/client/client.go @@ -128,6 +128,7 @@ type AllocRunner interface { WaitCh() <-chan struct{} DestroyCh() <-chan struct{} ShutdownCh() <-chan struct{} + Signal(taskName, signal string) error GetTaskEventHandler(taskName string) drivermanager.EventHandler RestartTask(taskName string, taskEvent *structs.TaskEvent) error @@ -706,6 +707,18 @@ func (c *Client) Stats() map[string]map[string]string { return stats } +// SignalAllocation sends a signal to the tasks within an allocation. +// If the provided task is empty, then every allocation will be signalled. +// If a task is provided, then only an exactly matching task will be signalled. +func (c *Client) SignalAllocation(allocID, task, signal string) error { + ar, err := c.getAllocRunner(allocID) + if err != nil { + return err + } + + return ar.Signal(task, signal) +} + // CollectAllocation garbage collects a single allocation on a node. Returns // true if alloc was found and garbage collected; otherwise false. func (c *Client) CollectAllocation(allocID string) bool { diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index d6d624365eae..72e6108c27c1 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -138,6 +138,8 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ return s.allocRestart(allocID, resp, req) case "gc": return s.allocGC(allocID, resp, req) + case "signal": + return s.allocSignal(allocID, resp, req) } return nil, CodedError(404, resourceNotFoundErr) @@ -255,6 +257,45 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http return nil, rpcErr } +func (s *HTTPServer) allocSignal(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if !(req.Method == "POST" || req.Method == "PUT") { + return nil, CodedError(405, ErrInvalidMethod) + } + + // Build the request and parse the ACL token + args := structs.AllocSignalRequest{} + err := decodeBody(req, &args) + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Failed to decode body: %v", err)) + } + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + args.AllocID = allocID + + // Determine the handler to use + useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID) + + // Make the RPC + var reply structs.GenericResponse + var rpcErr error + if useLocalClient { + rpcErr = s.agent.Client().ClientRPC("Allocations.Signal", &args, &reply) + } else if useClientRPC { + rpcErr = s.agent.Client().RPC("ClientAllocations.Signal", &args, &reply) + } else if useServerRPC { + rpcErr = s.agent.Server().RPC("ClientAllocations.Signal", &args, &reply) + } else { + rpcErr = CodedError(400, "No local Node and node_id not provided") + } + + if rpcErr != nil { + if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) { + rpcErr = CodedError(404, rpcErr.Error()) + } + } + + return reply, rpcErr +} + func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { var secret string s.parseToken(req, &secret) diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 6ab9c0bc9d79..ed497a6716d1 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -405,6 +405,7 @@ func TestHTTP_AllocStop(t *testing.T) { require.NoError(state.UpsertAllocs(1000, []*structs.Allocation{alloc})) + // Test that the happy path works { // Make the HTTP request req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil) @@ -420,6 +421,7 @@ func TestHTTP_AllocStop(t *testing.T) { require.NotEmpty(a.Index, "missing index") } + // Test that we 404 when the allocid is invalid { // Make the HTTP request req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil) diff --git a/command/alloc_signal.go b/command/alloc_signal.go new file mode 100644 index 000000000000..bc33825a0dff --- /dev/null +++ b/command/alloc_signal.go @@ -0,0 +1,157 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" +) + +type AllocSignalCommand struct { + Meta +} + +func (a *AllocSignalCommand) Help() string { + helpText := ` +Usage: nomad alloc signal [options] + + signal an existing allocation. This command is used to signal a specific alloc + and its subtasks. If no task is provided then all of the allocations subtasks + will receive the signal. + +General Options: + + ` + generalOptionsUsage() + ` + +Signal Specific Options: + + -s + Specify the signal that the selected tasks should receive. + + -verbose + Show full information. +` + return strings.TrimSpace(helpText) +} + +func (c *AllocSignalCommand) Name() string { return "alloc signal" } + +func (c *AllocSignalCommand) Run(args []string) int { + var verbose bool + var signal string + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&verbose, "verbose", false, "") + flags.StringVar(&signal, "s", "SIGKILL", "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Check that we got exactly one alloc + args = flags.Args() + if len(args) < 1 || len(args) > 2 { + c.Ui.Error("This command takes up to two arguments: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + allocID := args[0] + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + + // Query the allocation info + if len(allocID) == 1 { + c.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + + allocID = sanitizeUUIDPrefix(allocID) + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + + if len(allocs) == 0 { + c.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + + if len(allocs) > 1 { + // Format the allocs + out := formatAllocListStubs(allocs, verbose, length) + c.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out)) + return 1 + } + + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 + } + + var taskName string + if len(args) == 2 { + // Validate Task + taskName = args[1] + err := validateTaskExistsInAllocation(taskName, alloc) + if err != nil { + c.Ui.Error(err.Error()) + return 1 + } + } + + err = client.Allocations().Signal(alloc, nil, taskName, signal) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error signalling allocation: %s", err)) + return 1 + } + + return 0 +} + +func (a *AllocSignalCommand) Synopsis() string { + return "Signal a running allocation" +} + +func (c *AllocSignalCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-s": complete.PredictNothing, + "-verbose": complete.PredictNothing, + }) +} +func (c *AllocSignalCommand) AutocompleteArgs() complete.Predictor { + // Here we only autocomplete allocation names. Eventually we may consider + // expanding this to also autocomplete task names. To do so, we'll need to + // either change the autocompletion api, or implement parsing such that we can + // easily compute the current arg position. + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Allocs, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Allocs] + }) +} diff --git a/command/alloc_signal_test.go b/command/alloc_signal_test.go new file mode 100644 index 000000000000..87cdb46fbed3 --- /dev/null +++ b/command/alloc_signal_test.go @@ -0,0 +1,142 @@ +package command + +import ( + "fmt" + "testing" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/cli" + "github.com/posener/complete" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAllocSignalCommand_Implements(t *testing.T) { + t.Parallel() + var _ cli.Command = &AllocSignalCommand{} +} + +func TestAllocSignalCommand_Fails(t *testing.T) { + t.Parallel() + srv, _, url := testServer(t, false, nil) + defer srv.Shutdown() + + require := require.New(t) + + ui := new(cli.MockUi) + cmd := &AllocSignalCommand{Meta: Meta{Ui: ui}} + + // Fails on lack of alloc ID + require.Equal(1, cmd.Run([]string{})) + require.Contains(ui.ErrorWriter.String(), "This command takes up to two arguments") + ui.ErrorWriter.Reset() + + // Fails on misuse + require.Equal(1, cmd.Run([]string{"some", "bad", "args"})) + require.Contains(ui.ErrorWriter.String(), "This command takes up to two arguments") + ui.ErrorWriter.Reset() + + // Fails on connection failure + require.Equal(1, cmd.Run([]string{"-address=nope", "foobar"})) + require.Contains(ui.ErrorWriter.String(), "Error querying allocation") + ui.ErrorWriter.Reset() + + // Fails on missing alloc + code := cmd.Run([]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C"}) + require.Equal(1, code) + require.Contains(ui.ErrorWriter.String(), "No allocation(s) with prefix or id") + ui.ErrorWriter.Reset() + + // Fail on identifier with too few characters + require.Equal(1, cmd.Run([]string{"-address=" + url, "2"})) + require.Contains(ui.ErrorWriter.String(), "must contain at least two characters.") + ui.ErrorWriter.Reset() +} + +func TestAllocSignalCommand_AutocompleteArgs(t *testing.T) { + assert := assert.New(t) + + srv, _, url := testServer(t, true, nil) + defer srv.Shutdown() + + ui := new(cli.MockUi) + cmd := &AllocSignalCommand{Meta: Meta{Ui: ui, flagAddress: url}} + + // Create a fake alloc + state := srv.Agent.Server().State() + a := mock.Alloc() + assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a})) + + prefix := a.ID[:5] + args := complete.Args{All: []string{"signal", prefix}, Last: prefix} + predictor := cmd.AutocompleteArgs() + + // Match Allocs + res := predictor.Predict(args) + assert.Equal(1, len(res)) + assert.Equal(a.ID, res[0]) +} + +func TestAllocSignalCommand_Run(t *testing.T) { + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + require := require.New(t) + + // Wait for a node to be ready + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + for _, node := range nodes { + if _, ok := node.Drivers["mock_driver"]; ok && + node.Status == structs.NodeStatusReady { + return true, nil + } + } + return false, fmt.Errorf("no ready nodes") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + ui := new(cli.MockUi) + cmd := &AllocSignalCommand{Meta: Meta{Ui: ui}} + + jobID := "job1_sfx" + job1 := testJob(jobID) + resp, _, err := client.Jobs().Register(job1, nil) + require.NoError(err) + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + // get an alloc id + allocId1 := "" + if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { + if len(allocs) > 0 { + allocId1 = allocs[0].ID + } + } + require.NotEmpty(allocId1, "unable to find allocation") + + // Wait for alloc to be running + testutil.WaitForResult(func() (bool, error) { + alloc, _, err := client.Allocations().Info(allocId1, nil) + if err != nil { + return false, err + } + if alloc.ClientStatus == api.AllocClientStatusRunning { + return true, nil + } + return false, fmt.Errorf("alloc is not running, is: %s", alloc.ClientStatus) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + require.Equal(cmd.Run([]string{"-address=" + url, allocId1}), 0, "expected successful exit code") + + ui.OutputWriter.Reset() +} diff --git a/command/commands.go b/command/commands.go index 38f06e277759..9619a44f32da 100644 --- a/command/commands.go +++ b/command/commands.go @@ -145,6 +145,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "alloc signal": func() (cli.Command, error) { + return &AllocSignalCommand{ + Meta: meta, + }, nil + }, "alloc stop": func() (cli.Command, error) { return &AllocStopCommand{ Meta: meta, diff --git a/nomad/client_alloc_endpoint.go b/nomad/client_alloc_endpoint.go index 83c1a0747775..44713a9b75f8 100644 --- a/nomad/client_alloc_endpoint.go +++ b/nomad/client_alloc_endpoint.go @@ -65,6 +65,62 @@ func (a *ClientAllocations) GarbageCollectAll(args *structs.NodeSpecificRequest, return NodeRpc(state.Session, "Allocations.GarbageCollectAll", args, reply) } +// Signal is used to send a signal to an allocation on a client. +func (a *ClientAllocations) Signal(args *structs.AllocSignalRequest, reply *structs.GenericResponse) error { + // We only allow stale reads since the only potentially stale information is + // the Node registration and the cost is fairly high for adding another hope + // in the forwarding chain. + args.QueryOptions.AllowStale = true + + // Potentially forward to a different region. + if done, err := a.srv.forward("ClientAllocations.Signal", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client_allocations", "signal"}, time.Now()) + + // Check node read permissions + if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) { + return structs.ErrPermissionDenied + } + + // Verify the arguments. + if args.AllocID == "" { + return errors.New("missing AllocID") + } + + // Find the allocation + snap, err := a.srv.State().Snapshot() + if err != nil { + return err + } + + alloc, err := snap.AllocByID(nil, args.AllocID) + if err != nil { + return err + } + + if alloc == nil { + return structs.NewErrUnknownAllocation(args.AllocID) + } + + // Make sure Node is valid and new enough to support RPC + _, err = getNodeForRpc(snap, alloc.NodeID) + if err != nil { + return err + } + + // Get the connection to the client + state, ok := a.srv.getNodeConn(alloc.NodeID) + if !ok { + return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.Signal", args, reply) + } + + // Make the RPC + return NodeRpc(state.Session, "Allocations.Signal", args, reply) +} + // GarbageCollect is used to garbage collect an allocation on a client. func (a *ClientAllocations) GarbageCollect(args *structs.AllocSpecificRequest, reply *structs.GenericResponse) error { // We only allow stale reads since the only potentially stale information is diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8d2ae8a7f2c8..e694dff4ae52 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -741,6 +741,14 @@ type AllocSpecificRequest struct { QueryOptions } +// AllocSignalRequest is used to signal a specific allocation +type AllocSignalRequest struct { + AllocID string + Task string + Signal string + QueryOptions +} + // AllocsGetRequest is used to query a set of allocations type AllocsGetRequest struct { AllocIDs []string @@ -6162,6 +6170,11 @@ func (e *TaskEvent) SetSignal(s int) *TaskEvent { return e } +func (e *TaskEvent) SetSignalText(s string) *TaskEvent { + e.Details["signal"] = s + return e +} + func (e *TaskEvent) SetExitMessage(err error) *TaskEvent { if err != nil { e.Message = err.Error()