diff --git a/CHANGELOG.md b/CHANGELOG.md index d730d864451a..6043eea978e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ FEATURES: * **Event Stream**: Subscribe to change events as they occur in real time. [[GH-9013](https://github.com/hashicorp/nomad/issues/9013)] +* **Namespaces OSS**: Namespaces are now available in open source Nomad. [[GH-9135](https://github.com/hashicorp/nomad/issues/9135)] * **Topology Visualization**: See all of the clients and allocations in a cluster at once. [[GH-9077](https://github.com/hashicorp/nomad/issues/9077)] IMPROVEMENTS: diff --git a/command/agent/http.go b/command/agent/http.go index 582cae0a2de1..6dee1a4f58f1 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -328,6 +328,10 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream)) + s.mux.HandleFunc("/v1/namespaces", s.wrap(s.NamespacesRequest)) + s.mux.HandleFunc("/v1/namespace", s.wrap(s.NamespaceCreateRequest)) + s.mux.HandleFunc("/v1/namespace/", s.wrap(s.NamespaceSpecificRequest)) + if uiEnabled { s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()})))) } else { diff --git a/command/agent/http_oss.go b/command/agent/http_oss.go index 61de2934742f..49056510b1b1 100644 --- a/command/agent/http_oss.go +++ b/command/agent/http_oss.go @@ -8,10 +8,6 @@ import ( // registerEnterpriseHandlers is a no-op for the oss release func (s *HTTPServer) registerEnterpriseHandlers() { - s.mux.HandleFunc("/v1/namespaces", s.wrap(s.entOnly)) - s.mux.HandleFunc("/v1/namespace", s.wrap(s.entOnly)) - s.mux.HandleFunc("/v1/namespace/", s.wrap(s.entOnly)) - s.mux.HandleFunc("/v1/sentinel/policies", s.wrap(s.entOnly)) s.mux.HandleFunc("/v1/sentinel/policy/", s.wrap(s.entOnly)) diff --git a/command/agent/namespace_endpoint.go b/command/agent/namespace_endpoint.go new file mode 100644 index 000000000000..64f9a20ab0a8 --- /dev/null +++ b/command/agent/namespace_endpoint.go @@ -0,0 +1,119 @@ +package agent + +import ( + "net/http" + "strings" + + "github.com/hashicorp/nomad/nomad/structs" +) + +func (s *HTTPServer) NamespacesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "GET" { + return nil, CodedError(405, ErrInvalidMethod) + } + + args := structs.NamespaceListRequest{} + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.NamespaceListResponse + if err := s.agent.RPC("Namespace.ListNamespaces", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.Namespaces == nil { + out.Namespaces = make([]*structs.Namespace, 0) + } + return out.Namespaces, nil +} + +func (s *HTTPServer) NamespaceSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + name := strings.TrimPrefix(req.URL.Path, "/v1/namespace/") + if len(name) == 0 { + return nil, CodedError(400, "Missing Namespace Name") + } + switch req.Method { + case "GET": + return s.namespaceQuery(resp, req, name) + case "PUT", "POST": + return s.namespaceUpdate(resp, req, name) + case "DELETE": + return s.namespaceDelete(resp, req, name) + default: + return nil, CodedError(405, ErrInvalidMethod) + } +} + +func (s *HTTPServer) NamespaceCreateRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "PUT" && req.Method != "POST" { + return nil, CodedError(405, ErrInvalidMethod) + } + + return s.namespaceUpdate(resp, req, "") +} + +func (s *HTTPServer) namespaceQuery(resp http.ResponseWriter, req *http.Request, + namespaceName string) (interface{}, error) { + args := structs.NamespaceSpecificRequest{ + Name: namespaceName, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.SingleNamespaceResponse + if err := s.agent.RPC("Namespace.GetNamespace", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.Namespace == nil { + return nil, CodedError(404, "Namespace not found") + } + return out.Namespace, nil +} + +func (s *HTTPServer) namespaceUpdate(resp http.ResponseWriter, req *http.Request, + namespaceName string) (interface{}, error) { + // Parse the namespace + var namespace structs.Namespace + if err := decodeBody(req, &namespace); err != nil { + return nil, CodedError(500, err.Error()) + } + + // Ensure the namespace name matches + if namespaceName != "" && namespace.Name != namespaceName { + return nil, CodedError(400, "Namespace name does not match request path") + } + + // Format the request + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{&namespace}, + } + s.parseWriteRequest(req, &args.WriteRequest) + + var out structs.GenericResponse + if err := s.agent.RPC("Namespace.UpsertNamespaces", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return nil, nil +} + +func (s *HTTPServer) namespaceDelete(resp http.ResponseWriter, req *http.Request, + namespaceName string) (interface{}, error) { + + args := structs.NamespaceDeleteRequest{ + Namespaces: []string{namespaceName}, + } + s.parseWriteRequest(req, &args.WriteRequest) + + var out structs.GenericResponse + if err := s.agent.RPC("Namespace.DeleteNamespaces", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return nil, nil +} diff --git a/command/agent/namespace_endpoint_test.go b/command/agent/namespace_endpoint_test.go new file mode 100644 index 000000000000..9e9a0fd01326 --- /dev/null +++ b/command/agent/namespace_endpoint_test.go @@ -0,0 +1,172 @@ +// +build ent + +package agent + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" +) + +func TestHTTP_NamespaceList(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + ns1 := mock.Namespace() + ns2 := mock.Namespace() + ns3 := mock.Namespace() + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2, ns3}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(s.Agent.RPC("Namespace.UpsertNamespaces", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/namespaces", nil) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespacesRequest(respW, req) + assert.Nil(err) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + assert.Equal("true", respW.HeaderMap.Get("X-Nomad-KnownLeader")) + assert.NotZero(respW.HeaderMap.Get("X-Nomad-LastContact")) + + // Check the output (the 3 we register + default) + assert.Len(obj.([]*structs.Namespace), 4) + }) +} + +func TestHTTP_NamespaceQuery(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + ns1 := mock.Namespace() + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(s.Agent.RPC("Namespace.UpsertNamespaces", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/namespace/"+ns1.Name, nil) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceSpecificRequest(respW, req) + assert.Nil(err) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + assert.Equal("true", respW.HeaderMap.Get("X-Nomad-KnownLeader")) + assert.NotZero(respW.HeaderMap.Get("X-Nomad-LastContact")) + + // Check the output + assert.Equal(ns1.Name, obj.(*structs.Namespace).Name) + }) +} + +func TestHTTP_NamespaceCreate(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Make the HTTP request + ns1 := mock.Namespace() + buf := encodeReq(ns1) + req, err := http.NewRequest("PUT", "/v1/namespace", buf) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceCreateRequest(respW, req) + assert.Nil(err) + assert.Nil(obj) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check policy was created + state := s.Agent.server.State() + out, err := state.NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + ns1.CreateIndex, ns1.ModifyIndex = out.CreateIndex, out.ModifyIndex + assert.Equal(ns1.Name, out.Name) + assert.Equal(ns1, out) + }) +} + +func TestHTTP_NamespaceUpdate(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Make the HTTP request + ns1 := mock.Namespace() + buf := encodeReq(ns1) + req, err := http.NewRequest("PUT", "/v1/namespace/"+ns1.Name, buf) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceSpecificRequest(respW, req) + assert.Nil(err) + assert.Nil(obj) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check policy was created + state := s.Agent.server.State() + out, err := state.NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + ns1.CreateIndex, ns1.ModifyIndex = out.CreateIndex, out.ModifyIndex + assert.Equal(ns1.Name, out.Name) + assert.Equal(ns1, out) + }) +} + +func TestHTTP_NamespaceDelete(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + ns1 := mock.Namespace() + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(s.Agent.RPC("Namespace.UpsertNamespaces", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("DELETE", "/v1/namespace/"+ns1.Name, nil) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceSpecificRequest(respW, req) + assert.Nil(err) + assert.Nil(obj) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check policy was created + state := s.Agent.server.State() + out, err := state.NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + }) +} diff --git a/helper/raftutil/generate_msgtypes.sh b/helper/raftutil/generate_msgtypes.sh index 25c8d565ecb5..5782445f9c97 100755 --- a/helper/raftutil/generate_msgtypes.sh +++ b/helper/raftutil/generate_msgtypes.sh @@ -13,13 +13,16 @@ var msgTypeNames = map[structs.MessageType]string{ EOF cat ../../nomad/structs/structs.go \ - | grep -A500 'MessageType = iota' \ - | grep -v -e '//' \ + | grep -A500 'MessageType = 0' \ + | grep -v -e '//' \ + | grep -v -e '^$' \ | awk '/^\)$/ { exit; } /.*/ { printf " structs.%s: \"%s\",\n", $1, $1}' echo '}' } +echo "==> Generating type map..." generate_file > msgtypes.go +echo "==> Formatting type map..." gofmt -w msgtypes.go diff --git a/helper/raftutil/msgtypes.go b/helper/raftutil/msgtypes.go index 867f45ea582e..49daa8dc1150 100644 --- a/helper/raftutil/msgtypes.go +++ b/helper/raftutil/msgtypes.go @@ -45,4 +45,6 @@ var msgTypeNames = map[structs.MessageType]string{ structs.ScalingEventRegisterRequestType: "ScalingEventRegisterRequestType", structs.CSIVolumeClaimBatchRequestType: "CSIVolumeClaimBatchRequestType", structs.CSIPluginDeleteRequestType: "CSIPluginDeleteRequestType", + structs.NamespaceUpsertRequestType: "NamespaceUpsertRequestType", + structs.NamespaceDeleteRequestType: "NamespaceDeleteRequestType", } diff --git a/nomad/fsm.go b/nomad/fsm.go index 063269a2b7e3..f1d7f0f20cd5 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -32,26 +32,29 @@ const ( type SnapshotType byte const ( - NodeSnapshot SnapshotType = iota - JobSnapshot - IndexSnapshot - EvalSnapshot - AllocSnapshot - TimeTableSnapshot - PeriodicLaunchSnapshot - JobSummarySnapshot - VaultAccessorSnapshot - JobVersionSnapshot - DeploymentSnapshot - ACLPolicySnapshot - ACLTokenSnapshot - SchedulerConfigSnapshot - ClusterMetadataSnapshot - ServiceIdentityTokenAccessorSnapshot - ScalingPolicySnapshot - CSIPluginSnapshot - CSIVolumeSnapshot - ScalingEventsSnapshot + NodeSnapshot SnapshotType = 0 + JobSnapshot SnapshotType = 1 + IndexSnapshot SnapshotType = 2 + EvalSnapshot SnapshotType = 3 + AllocSnapshot SnapshotType = 4 + TimeTableSnapshot SnapshotType = 5 + PeriodicLaunchSnapshot SnapshotType = 6 + JobSummarySnapshot SnapshotType = 7 + VaultAccessorSnapshot SnapshotType = 8 + JobVersionSnapshot SnapshotType = 9 + DeploymentSnapshot SnapshotType = 10 + ACLPolicySnapshot SnapshotType = 11 + ACLTokenSnapshot SnapshotType = 12 + SchedulerConfigSnapshot SnapshotType = 13 + ClusterMetadataSnapshot SnapshotType = 14 + ServiceIdentityTokenAccessorSnapshot SnapshotType = 15 + ScalingPolicySnapshot SnapshotType = 16 + CSIPluginSnapshot SnapshotType = 17 + CSIVolumeSnapshot SnapshotType = 18 + ScalingEventsSnapshot SnapshotType = 19 + + // Namespace appliers were moved from enterprise and therefore start at 64 + NamespaceSnapshot SnapshotType = 64 ) // LogApplier is the definition of a function that can apply a Raft log @@ -286,6 +289,10 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyCSIVolumeBatchClaim(buf[1:], log.Index) case structs.CSIPluginDeleteRequestType: return n.applyCSIPluginDelete(buf[1:], log.Index) + case structs.NamespaceUpsertRequestType: + return n.applyNamespaceUpsert(buf[1:], log.Index) + case structs.NamespaceDeleteRequestType: + return n.applyNamespaceDelete(buf[1:], log.Index) } // Check enterprise only message types. @@ -1248,6 +1255,58 @@ func (n *nomadFSM) applyCSIPluginDelete(buf []byte, index uint64) interface{} { return nil } +// applyNamespaceUpsert is used to upsert a set of namespaces +func (n *nomadFSM) applyNamespaceUpsert(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_namespace_upsert"}, time.Now()) + var req structs.NamespaceUpsertRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + var trigger []string + for _, ns := range req.Namespaces { + old, err := n.state.NamespaceByName(nil, ns.Name) + if err != nil { + n.logger.Error("namespace lookup failed", "error", err) + return err + } + + // If we are changing the quota on a namespace trigger evals for the + // older quota. + if old != nil && old.Quota != "" && old.Quota != ns.Quota { + trigger = append(trigger, old.Quota) + } + } + + if err := n.state.UpsertNamespaces(index, req.Namespaces); err != nil { + n.logger.Error("UpsertNamespaces failed", "error", err) + return err + } + + // Send the unblocks + for _, quota := range trigger { + n.blockedEvals.UnblockQuota(quota, index) + } + + return nil +} + +// applyNamespaceDelete is used to delete a set of namespaces +func (n *nomadFSM) applyNamespaceDelete(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_namespace_delete"}, time.Now()) + var req structs.NamespaceDeleteRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeleteNamespaces(index, req.Namespaces); err != nil { + n.logger.Error("DeleteNamespaces failed", "error", err) + return err + } + + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() @@ -1514,6 +1573,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := restore.CSIVolumeRestore(plugin); err != nil { return err } + + case NamespaceSnapshot: + namespace := new(structs.Namespace) + if err := dec.Decode(namespace); err != nil { + return err + } + if err := restore.NamespaceRestore(namespace); err != nil { + return err + } + default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -1816,6 +1885,9 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistNamespaces(sink, encoder); err != nil { + return err + } if err := s.persistEnterpriseTables(sink, encoder); err != nil { sink.Cancel() return err @@ -2177,6 +2249,34 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink, return nil } +// persistNamespaces persists all the namespaces. +func (s *nomadSnapshot) persistNamespaces(sink raft.SnapshotSink, encoder *codec.Encoder) error { + // Get all the jobs + ws := memdb.NewWatchSet() + namespaces, err := s.snap.Namespaces(ws) + if err != nil { + return err + } + + for { + // Get the next item + raw := namespaces.Next() + if raw == nil { + break + } + + // Prepare the request struct + namespace := raw.(*structs.Namespace) + + // Write out a namespace registration + sink.Write([]byte{byte(NamespaceSnapshot)}) + if err := encoder.Encode(namespace); err != nil { + return err + } + } + return nil +} + func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get scheduler config diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index f2f49126a079..506436afa267 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3201,3 +3201,78 @@ func TestFSM_ClusterMetadata(t *testing.T) { r.Equal(clusterID, storedMetadata.ClusterID) r.Equal(now, storedMetadata.CreateTime) } + +func TestFSM_UpsertNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + fsm := testFSM(t) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + req := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2}, + } + buf, err := structs.Encode(structs.NamespaceUpsertRequestType, req) + assert.Nil(err) + assert.Nil(fsm.Apply(makeLog(buf))) + + // Verify we are registered + ws := memdb.NewWatchSet() + out, err := fsm.State().NamespaceByName(ws, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = fsm.State().NamespaceByName(ws, ns2.Name) + assert.Nil(err) + assert.NotNil(out) +} + +func TestFSM_DeleteNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + fsm := testFSM(t) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + assert.Nil(fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2})) + + req := structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + } + buf, err := structs.Encode(structs.NamespaceDeleteRequestType, req) + assert.Nil(err) + assert.Nil(fsm.Apply(makeLog(buf))) + + // Verify we are NOT registered + ws := memdb.NewWatchSet() + out, err := fsm.State().NamespaceByName(ws, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = fsm.State().NamespaceByName(ws, ns2.Name) + assert.Nil(err) + assert.Nil(out) +} + +func TestFSM_SnapshotRestore_Namespaces(t *testing.T) { + t.Parallel() + // Add some state + fsm := testFSM(t) + state := fsm.State() + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state.UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + ws := memdb.NewWatchSet() + out1, _ := state2.NamespaceByName(ws, ns1.Name) + out2, _ := state2.NamespaceByName(ws, ns2.Name) + if !reflect.DeepEqual(ns1, out1) { + t.Fatalf("bad: \n%#v\n%#v", out1, ns1) + } + if !reflect.DeepEqual(ns2, out2) { + t.Fatalf("bad: \n%#v\n%#v", out2, ns2) + } +} diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index dc9e859b7811..6202a3ebffe9 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -12,6 +12,7 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/raft" "github.com/kr/pretty" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/acl" @@ -2130,6 +2131,83 @@ func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluat return nil } +func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) { + t.Parallel() + s1, _, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Policy with read on default namespace and write on non default + policy := &structs.ACLPolicy{ + Name: fmt.Sprintf("policy-%s", uuid.Generate()), + Description: "Super cool policy!", + Rules: ` + namespace "default" { + policy = "read" + } + namespace "test" { + policy = "write" + } + node { + policy = "read" + } + agent { + policy = "read" + } + `, + CreateIndex: 10, + ModifyIndex: 20, + } + policy.SetHash() + + assert := assert.New(t) + + // Upsert policy and token + token := mock.ACLToken() + token.Policies = []string{policy.Name} + err := s1.State().UpsertACLPolicies(100, []*structs.ACLPolicy{policy}) + assert.Nil(err) + + err = s1.State().UpsertACLTokens(110, []*structs.ACLToken{token}) + assert.Nil(err) + + // Upsert namespace + ns := mock.Namespace() + ns.Name = "test" + err = s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns}) + assert.Nil(err) + + // Create the register request + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + req.AuthToken = token.SecretID + // Use token without write access to default namespace, expect failure + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + assert.NotNil(err, "expected permission denied") + + req.Namespace = "test" + job.Namespace = "test" + + // Use token with write access to default namespace, expect success + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + assert.Nil(err, "unexpected err: %v", err) + assert.NotEqual(resp.Index, 0, "bad index: %d", resp.Index) + + // Check for the node in the FSM + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + assert.Nil(err) + assert.NotNil(out, "expected job") +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -3478,8 +3556,9 @@ func TestJobEndpoint_Deregister_EvalCreation_Modern(t *testing.T) { }) } -// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job deregister creates an eval -// atomically with the registration, but handle legacy clients by adding a new eval update +// TestJobEndpoint_Deregister_EvalCreation_Legacy asserts that job deregister +// creates an eval atomically with the registration, but handle legacy clients +// by adding a new eval update func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { t.Parallel() diff --git a/nomad/leader.go b/nomad/leader.go index 8576219ddcac..1ec7bb5e388a 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -325,6 +325,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { if s.config.ACLEnabled && s.config.Region != s.config.AuthoritativeRegion { go s.replicateACLPolicies(stopCh) go s.replicateACLTokens(stopCh) + go s.replicateNamespaces(stopCh) } // Setup any enterprise systems required. @@ -345,6 +346,146 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return nil } +// replicateNamespaces is used to replicate namespaces from the authoritative +// region to this region. +func (s *Server) replicateNamespaces(stopCh chan struct{}) { + req := structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{ + Region: s.config.AuthoritativeRegion, + AllowStale: true, + }, + } + limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit)) + s.logger.Debug("starting namespace replication from authoritative region", "region", req.Region) + +START: + for { + select { + case <-stopCh: + return + default: + } + + // Rate limit how often we attempt replication + limiter.Wait(context.Background()) + + // Fetch the list of namespaces + var resp structs.NamespaceListResponse + req.AuthToken = s.ReplicationToken() + err := s.forwardRegion(s.config.AuthoritativeRegion, "Namespace.ListNamespaces", &req, &resp) + if err != nil { + s.logger.Error("failed to fetch namespaces from authoritative region", "error", err) + goto ERR_WAIT + } + + // Perform a two-way diff + delete, update := diffNamespaces(s.State(), req.MinQueryIndex, resp.Namespaces) + + // Delete namespaces that should not exist + if len(delete) > 0 { + args := &structs.NamespaceDeleteRequest{ + Namespaces: delete, + } + _, _, err := s.raftApply(structs.NamespaceDeleteRequestType, args) + if err != nil { + s.logger.Error("failed to delete namespaces", "error", err) + goto ERR_WAIT + } + } + + // Fetch any outdated namespaces + var fetched []*structs.Namespace + if len(update) > 0 { + req := structs.NamespaceSetRequest{ + Namespaces: update, + QueryOptions: structs.QueryOptions{ + Region: s.config.AuthoritativeRegion, + AuthToken: s.ReplicationToken(), + AllowStale: true, + MinQueryIndex: resp.Index - 1, + }, + } + var reply structs.NamespaceSetResponse + if err := s.forwardRegion(s.config.AuthoritativeRegion, "Namespace.GetNamespaces", &req, &reply); err != nil { + s.logger.Error("failed to fetch namespaces from authoritative region", "error", err) + goto ERR_WAIT + } + for _, namespace := range reply.Namespaces { + fetched = append(fetched, namespace) + } + } + + // Update local namespaces + if len(fetched) > 0 { + args := &structs.NamespaceUpsertRequest{ + Namespaces: fetched, + } + _, _, err := s.raftApply(structs.NamespaceUpsertRequestType, args) + if err != nil { + s.logger.Error("failed to update namespaces", "error", err) + goto ERR_WAIT + } + } + + // Update the minimum query index, blocks until there is a change. + req.MinQueryIndex = resp.Index + } + +ERR_WAIT: + select { + case <-time.After(s.config.ReplicationBackoff): + goto START + case <-stopCh: + return + } +} + +// diffNamespaces is used to perform a two-way diff between the local namespaces +// and the remote namespaces to determine which namespaces need to be deleted or +// updated. +func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*structs.Namespace) (delete []string, update []string) { + // Construct a set of the local and remote namespaces + local := make(map[string][]byte) + remote := make(map[string]struct{}) + + // Add all the local namespaces + iter, err := state.Namespaces(nil) + if err != nil { + panic("failed to iterate local namespaces") + } + for { + raw := iter.Next() + if raw == nil { + break + } + namespace := raw.(*structs.Namespace) + local[namespace.Name] = namespace.Hash + } + + // Iterate over the remote namespaces + for _, rns := range remoteList { + remote[rns.Name] = struct{}{} + + // Check if the namespace is missing locally + if localHash, ok := local[rns.Name]; !ok { + update = append(update, rns.Name) + + // Check if the namespace is newer remotely and there is a hash + // mis-match. + } else if rns.ModifyIndex > minIndex && !bytes.Equal(localHash, rns.Hash) { + update = append(update, rns.Name) + } + } + + // Check if namespaces should be deleted + for lns := range local { + if _, ok := remote[lns]; !ok { + delete = append(delete, lns) + } + } + return +} + // restoreEvals is used to restore pending evaluations into the eval broker and // blocked evaluations into the blocked eval tracker. The broker and blocked // eval tracker is maintained only by the leader, so it must be restored anytime diff --git a/nomad/leader_test.go b/nomad/leader_test.go index ba784684d084..997854173982 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -3,6 +3,7 @@ package nomad import ( "errors" "fmt" + "sort" "strconv" "testing" "time" @@ -1457,6 +1458,86 @@ func TestServer_ReconcileMember(t *testing.T) { } } +func TestLeader_ReplicateNamespaces(t *testing.T) { + t.Parallel() + assert := assert.New(t) + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.Region = "region1" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + }) + defer cleanupS1() + s2, _, cleanupS2 := TestACLServer(t, func(c *Config) { + c.Region = "region2" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + c.ReplicationBackoff = 20 * time.Millisecond + c.ReplicationToken = root.SecretID + }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // Write a namespace to the authoritative region + ns1 := mock.Namespace() + assert.Nil(s1.State().UpsertNamespaces(100, []*structs.Namespace{ns1})) + + // Wait for the namespace to replicate + testutil.WaitForResult(func() (bool, error) { + state := s2.State() + out, err := state.NamespaceByName(nil, ns1.Name) + return out != nil, err + }, func(err error) { + t.Fatalf("should replicate namespace") + }) + + // Delete the namespace at the authoritative region + assert.Nil(s1.State().DeleteNamespaces(200, []string{ns1.Name})) + + // Wait for the namespace deletion to replicate + testutil.WaitForResult(func() (bool, error) { + state := s2.State() + out, err := state.NamespaceByName(nil, ns1.Name) + return out == nil, err + }, func(err error) { + t.Fatalf("should replicate namespace deletion") + }) +} + +func TestLeader_DiffNamespaces(t *testing.T) { + t.Parallel() + + state := state.TestStateStore(t) + + // Populate the local state + ns1 := mock.Namespace() + ns2 := mock.Namespace() + ns3 := mock.Namespace() + assert.Nil(t, state.UpsertNamespaces(100, []*structs.Namespace{ns1, ns2, ns3})) + + // Simulate a remote list + rns2 := ns2.Copy() + rns2.ModifyIndex = 50 // Ignored, same index + rns3 := ns3.Copy() + rns3.ModifyIndex = 100 // Updated, higher index + rns3.Hash = []byte{0, 1, 2, 3} + ns4 := mock.Namespace() + remoteList := []*structs.Namespace{ + rns2, + rns3, + ns4, + } + delete, update := diffNamespaces(state, 50, remoteList) + sort.Strings(delete) + + // ns1 does not exist on the remote side, should delete + assert.Equal(t, []string{structs.DefaultNamespace, ns1.Name}, delete) + + // ns2 is un-modified - ignore. ns3 modified, ns4 new. + assert.Equal(t, []string{ns3.Name, ns4.Name}, update) +} + // waitForStableLeadership waits until a leader is elected and all servers // get promoted as voting members, returns the leader func waitForStableLeadership(t *testing.T, servers []*Server) *Server { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index a2bda9e57255..927abef35266 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1505,3 +1505,14 @@ func AllocNetworkStatus() *structs.AllocNetworkStatus { }, } } + +func Namespace() *structs.Namespace { + ns := &structs.Namespace{ + Name: fmt.Sprintf("team-%s", uuid.Generate()), + Description: "test namespace", + CreateIndex: 100, + ModifyIndex: 200, + } + ns.SetHash() + return ns +} diff --git a/nomad/namespace_endpoint.go b/nomad/namespace_endpoint.go new file mode 100644 index 000000000000..7701801a15ac --- /dev/null +++ b/nomad/namespace_endpoint.go @@ -0,0 +1,371 @@ +package nomad + +import ( + "fmt" + "time" + + metrics "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Namespace endpoint is used for manipulating namespaces +type Namespace struct { + srv *Server +} + +// UpsertNamespaces is used to upsert a set of namespaces +func (n *Namespace) UpsertNamespaces(args *structs.NamespaceUpsertRequest, + reply *structs.GenericResponse) error { + args.Region = n.srv.config.AuthoritativeRegion + if done, err := n.srv.forward("Namespace.UpsertNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "upsert_namespaces"}, time.Now()) + + // Check management permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Validate there is at least one namespace + if len(args.Namespaces) == 0 { + return fmt.Errorf("must specify at least one namespace") + } + + // Validate the namespaces and set the hash + for _, ns := range args.Namespaces { + if err := ns.Validate(); err != nil { + return fmt.Errorf("Invalid namespace %q: %v", ns.Name, err) + } + + ns.SetHash() + } + + // Update via Raft + out, index, err := n.srv.raftApply(structs.NamespaceUpsertRequestType, args) + if err != nil { + return err + } + + // Check if there was an error when applying. + if err, ok := out.(error); ok && err != nil { + return err + } + + // Update the index + reply.Index = index + return nil +} + +// DeleteNamespaces is used to delete a namespace +func (n *Namespace) DeleteNamespaces(args *structs.NamespaceDeleteRequest, reply *structs.GenericResponse) error { + args.Region = n.srv.config.AuthoritativeRegion + if done, err := n.srv.forward("Namespace.DeleteNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "delete_namespaces"}, time.Now()) + + // Check management permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Validate at least one namespace + if len(args.Namespaces) == 0 { + return fmt.Errorf("must specify at least one namespace to delete") + } + + for _, ns := range args.Namespaces { + if ns == structs.DefaultNamespace { + return fmt.Errorf("can not delete default namespace") + } + } + + // Check that the deleting namespaces do not have non-terminal jobs in both + // this region and all federated regions + var mErr multierror.Error + for _, ns := range args.Namespaces { + nonTerminal, err := n.nonTerminalNamespaces(args.AuthToken, ns) + if err != nil { + multierror.Append(&mErr, err) + } else if len(nonTerminal) != 0 { + multierror.Append(&mErr, fmt.Errorf("namespace %q has non-terminal jobs in regions: %v", ns, nonTerminal)) + } + } + + if err := mErr.ErrorOrNil(); err != nil { + return err + } + + // Update via Raft + out, index, err := n.srv.raftApply(structs.NamespaceDeleteRequestType, args) + if err != nil { + return err + } + + // Check if there was an error when applying. + if err, ok := out.(error); ok && err != nil { + return err + } + + // Update the index + reply.Index = index + return nil +} + +// nonTerminalNamespaces returns whether the set of regions in which the +// namespaces contains non-terminal jobs, checking all federated regions +// including this one. +func (n *Namespace) nonTerminalNamespaces(authToken, namespace string) ([]string, error) { + regions := n.srv.Regions() + thisRegion := n.srv.Region() + terminal := make([]string, 0, len(regions)) + + // Check if this region is terminal + localTerminal, err := n.namespaceTerminalLocally(namespace) + if err != nil { + return nil, err + } + if !localTerminal { + terminal = append(terminal, thisRegion) + } + + for _, region := range regions { + if region == thisRegion { + continue + } + + remoteTerminal, err := n.namespaceTerminalInRegion(authToken, namespace, region) + if err != nil { + return nil, err + } + if !remoteTerminal { + terminal = append(terminal, region) + } + } + + return terminal, nil +} + +// namespaceTerminalLocally returns if the namespace contains only terminal jobs +// in the local region . +func (n *Namespace) namespaceTerminalLocally(namespace string) (bool, error) { + snap, err := n.srv.fsm.State().Snapshot() + if err != nil { + return false, err + } + + iter, err := snap.JobsByNamespace(nil, namespace) + if err != nil { + return false, err + } + + for { + raw := iter.Next() + if raw == nil { + break + } + + job := raw.(*structs.Job) + if job.Status != structs.JobStatusDead { + return false, nil + } + } + + return true, nil +} + +// namespaceTerminalInRegion returns if the namespace contains only terminal +// jobs in the given region . +func (n *Namespace) namespaceTerminalInRegion(authToken, namespace, region string) (bool, error) { + req := &structs.JobListRequest{ + QueryOptions: structs.QueryOptions{ + Region: region, + Namespace: namespace, + AllowStale: false, + AuthToken: authToken, + }, + } + + var resp structs.JobListResponse + done, err := n.srv.forward("Job.List", req, req, &resp) + if !done { + return false, fmt.Errorf("unexpectedly did not forward Job.List to region %q", region) + } else if err != nil { + return false, err + } + + for _, job := range resp.Jobs { + if job.Status != structs.JobStatusDead { + return false, nil + } + } + + return true, nil +} + +// ListNamespaces is used to list the namespaces +func (n *Namespace) ListNamespaces(args *structs.NamespaceListRequest, reply *structs.NamespaceListResponse) error { + if done, err := n.srv.forward("Namespace.ListNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "list_namespace"}, time.Now()) + + // Resolve token to acl to filter namespace list + aclObj, err := n.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, s *state.StateStore) error { + // Iterate over all the namespaces + var err error + var iter memdb.ResultIterator + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = s.NamespacesByNamePrefix(ws, prefix) + } else { + iter, err = s.Namespaces(ws) + } + if err != nil { + return err + } + + reply.Namespaces = nil + for { + raw := iter.Next() + if raw == nil { + break + } + ns := raw.(*structs.Namespace) + + // Only return namespaces allowed by acl + if aclObj == nil || aclObj.AllowNamespace(ns.Name) { + reply.Namespaces = append(reply.Namespaces, ns) + } + } + + // Use the last index that affected the namespace table + index, err := s.Index(state.TableNamespaces) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index + return nil + }} + return n.srv.blockingRPC(&opts) +} + +// GetNamespace is used to get a specific namespace +func (n *Namespace) GetNamespace(args *structs.NamespaceSpecificRequest, reply *structs.SingleNamespaceResponse) error { + if done, err := n.srv.forward("Namespace.GetNamespace", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "get_namespace"}, time.Now()) + + // Check capabilities for the given namespace permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNamespace(args.Name) { + return structs.ErrPermissionDenied + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, s *state.StateStore) error { + // Look for the namespace + out, err := s.NamespaceByName(ws, args.Name) + if err != nil { + return err + } + + // Setup the output + reply.Namespace = out + if out != nil { + reply.Index = out.ModifyIndex + } else { + // Use the last index that affected the namespace table + index, err := s.Index(state.TableNamespaces) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index + } + return nil + }} + return n.srv.blockingRPC(&opts) +} + +// GetNamespaces is used to get a set of namespaces +func (n *Namespace) GetNamespaces(args *structs.NamespaceSetRequest, reply *structs.NamespaceSetResponse) error { + if done, err := n.srv.forward("Namespace.GetNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "get_namespaces"}, time.Now()) + + // Check management permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, s *state.StateStore) error { + // Setup the output + reply.Namespaces = make(map[string]*structs.Namespace, len(args.Namespaces)) + + // Look for the namespace + for _, namespace := range args.Namespaces { + out, err := s.NamespaceByName(ws, namespace) + if err != nil { + return err + } + if out != nil { + reply.Namespaces[namespace] = out + } + } + + // Use the last index that affected the policy table + index, err := s.Index(state.TableNamespaces) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index + return nil + }} + return n.srv.blockingRPC(&opts) +} diff --git a/nomad/namespace_endpoint_test.go b/nomad/namespace_endpoint_test.go new file mode 100644 index 000000000000..eec1c50bd003 --- /dev/null +++ b/nomad/namespace_endpoint_test.go @@ -0,0 +1,772 @@ +package nomad + +import ( + "fmt" + "testing" + "time" + + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" +) + +func TestNamespaceEndpoint_GetNamespace(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns}) + + // Lookup the namespace + get := &structs.NamespaceSpecificRequest{ + Name: ns.Name, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.SingleNamespaceResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Equal(ns, resp.Namespace) + + // Lookup non-existing namespace + get.Name = uuid.Generate() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Nil(resp.Namespace) +} + +func TestNamespaceEndpoint_GetNamespace_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create the policy and tokens + validToken := mock.CreatePolicyAndToken(t, state, 1002, "test-valid", + mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob})) + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(ns2.Name, "", []string{acl.NamespaceCapabilityReadJob})) + + get := &structs.NamespaceSpecificRequest{ + Name: ns1.Name, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Lookup the namespace without a token and expect failure + { + var resp structs.SingleNamespaceResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with an invalid token + get.AuthToken = invalidToken.SecretID + { + var resp structs.SingleNamespaceResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a valid token + get.AuthToken = validToken.SecretID + { + var resp structs.SingleNamespaceResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Equal(ns1, resp.Namespace) + } + + // Try with a root token + get.AuthToken = root.SecretID + { + var resp structs.SingleNamespaceResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Equal(ns1, resp.Namespace) + } +} + +func TestNamespaceEndpoint_GetNamespace_Blocking(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the namespaces + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + // First create an namespace + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(100, []*structs.Namespace{ns1})) + }) + + // Upsert the namespace we are watching later + time.AfterFunc(200*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(200, []*structs.Namespace{ns2})) + }) + + // Lookup the namespace + req := &structs.NamespaceSpecificRequest{ + Name: ns2.Name, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + var resp structs.SingleNamespaceResponse + start := time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", req, &resp)) + assert.EqualValues(200, resp.Index) + assert.NotNil(resp.Namespace) + assert.Equal(ns2.Name, resp.Namespace.Name) + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + + // Namespace delete triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.DeleteNamespaces(300, []string{ns2.Name})) + }) + + req.QueryOptions.MinQueryIndex = 250 + var resp2 structs.SingleNamespaceResponse + start = time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", req, &resp2)) + assert.EqualValues(300, resp2.Index) + assert.Nil(resp2.Namespace) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } +} + +func TestNamespaceEndpoint_GetNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Lookup the namespace + get := &structs.NamespaceSetRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.NamespaceSetResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 2) + assert.Contains(resp.Namespaces, ns1.Name) + assert.Contains(resp.Namespaces, ns2.Name) +} + +func TestNamespaceEndpoint_GetNamespaces_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + state.UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create the policy and tokens + validToken := mock.CreatePolicyAndToken(t, state, 1002, "test-valid", + mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob})) + + // Lookup the namespace + get := &structs.NamespaceSetRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Lookup the namespaces without a token and expect a failure + { + var resp structs.NamespaceSetResponse + assert.NotNil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + } + + // Try with an non-management token + get.AuthToken = validToken.SecretID + { + var resp structs.NamespaceSetResponse + assert.NotNil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + } + + // Try with a root token + get.AuthToken = root.SecretID + { + var resp structs.NamespaceSetResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 2) + assert.Contains(resp.Namespaces, ns1.Name) + assert.Contains(resp.Namespaces, ns2.Name) + } +} + +func TestNamespaceEndpoint_GetNamespaces_Blocking(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the namespaces + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + // First create an namespace + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(100, []*structs.Namespace{ns1})) + }) + + // Upsert the namespace we are watching later + time.AfterFunc(200*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(200, []*structs.Namespace{ns2})) + }) + + // Lookup the namespace + req := &structs.NamespaceSetRequest{ + Namespaces: []string{ns2.Name}, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + var resp structs.NamespaceSetResponse + start := time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", req, &resp)) + assert.EqualValues(200, resp.Index) + assert.Len(resp.Namespaces, 1) + assert.Contains(resp.Namespaces, ns2.Name) + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + + // Namespace delete triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.DeleteNamespaces(300, []string{ns2.Name})) + }) + + req.QueryOptions.MinQueryIndex = 250 + var resp2 structs.NamespaceSetResponse + start = time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", req, &resp2)) + assert.EqualValues(300, resp2.Index) + assert.Empty(resp2.Namespaces) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } +} + +func TestNamespaceEndpoint_List(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + ns1.Name = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9" + ns2.Name = "aaaabbbb-3350-4b4b-d185-0e1992ed43e9" + assert.Nil(s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2})) + + // Lookup the namespaces + get := &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 3) + + // Lookup the namespaces by prefix + get = &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Prefix: "aaaabb", + }, + } + var resp2 structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp2)) + assert.EqualValues(1000, resp2.Index) + assert.Len(resp2.Namespaces, 1) +} + +func TestNamespaceEndpoint_List_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + + ns1.Name = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9" + ns2.Name = "bbbbbbbb-3350-4b4b-d185-0e1992ed43e9" + assert.Nil(s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2})) + + validDefToken := mock.CreatePolicyAndToken(t, state, 1001, "test-def-valid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS})) + validMultiToken := mock.CreatePolicyAndToken(t, state, 1002, "test-multi-valid", fmt.Sprintf("%s\n%s", + mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob}), + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))) + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy("invalid-namespace", "", []string{acl.NamespaceCapabilityReadJob})) + + get := &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Lookup the namespaces without a token and expect a failure + { + var resp structs.NamespaceListResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp) + assert.Nil(err) + assert.Len(resp.Namespaces, 0) + } + + // Try with an invalid token + get.AuthToken = invalidToken.SecretID + { + var resp structs.NamespaceListResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp) + assert.Nil(err) + assert.Len(resp.Namespaces, 0) + } + + // Try with a valid token for one + get.AuthToken = validDefToken.SecretID + { + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 1) + } + + // Try with a valid token for two + get.AuthToken = validMultiToken.SecretID + { + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 2) + } + + // Try with a root token + get.AuthToken = root.SecretID + { + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 3) + } +} + +func TestNamespaceEndpoint_List_Blocking(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the namespace + ns := mock.Namespace() + + // Upsert namespace triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(200, []*structs.Namespace{ns})) + }) + + req := &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + start := time.Now() + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", req, &resp)) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + assert.EqualValues(200, resp.Index) + assert.Len(resp.Namespaces, 2) + + // Namespace deletion triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.DeleteNamespaces(300, []string{ns.Name})) + }) + + req.MinQueryIndex = 200 + start = time.Now() + var resp2 structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", req, &resp2)) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } + assert.EqualValues(300, resp2.Index) + assert.Len(resp2.Namespaces, 1) +} + +func TestNamespaceEndpoint_DeleteNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Lookup the namespaces + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) +} + +func TestNamespaceEndpoint_DeleteNamespaces_NonTerminal_Local(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create a job in one + j := mock.Job() + j.Namespace = ns1.Name + assert.Nil(s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1001, j)) + + // Lookup the namespaces + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + if assert.NotNil(err) { + assert.Contains(err.Error(), "has non-terminal jobs") + } +} + +func TestNamespaceEndpoint_DeleteNamespaces_NonTerminal_Federated_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.Region = "region1" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + }) + defer cleanupS1() + s2, _, cleanupS2 := TestACLServer(t, func(c *Config) { + c.Region = "region2" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + c.ReplicationBackoff = 20 * time.Millisecond + c.ReplicationToken = root.SecretID + }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + codec := rpcClient(t, s1) + + // Create the register request + ns1 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1}) + + testutil.WaitForResult(func() (bool, error) { + state := s2.State() + out, err := state.NamespaceByName(nil, ns1.Name) + return out != nil, err + }, func(err error) { + t.Fatalf("should replicate namespace") + }) + + // Create a job in the namespace on the non-authority + j := mock.Job() + j.Namespace = ns1.Name + assert.Nil(s2.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1001, j)) + + // Delete the namespaces without the correct permissions + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name}, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + if assert.NotNil(err) { + assert.EqualError(err, structs.ErrPermissionDenied.Error()) + } + + // Try with a auth token + req.AuthToken = root.SecretID + var resp2 structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp2) + if assert.NotNil(err) { + assert.Contains(err.Error(), "has non-terminal jobs") + } +} + +func TestNamespaceEndpoint_DeleteNamespaces_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create the policy and tokens + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Delete namespaces without a token and expect failure + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not delete the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) + } + + // Try with an invalid token + req.AuthToken = invalidToken.SecretID + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not delete the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) + } + + // Try with a root token + req.AuthToken = root.SecretID + { + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) + + // Check we deleted the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.Nil(out) + } +} + +func TestNamespaceEndpoint_DeleteNamespaces_Default(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Delete the default namespace + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{structs.DefaultNamespace}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.NotNil(msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp)) +} + +func TestNamespaceEndpoint_UpsertNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + // Lookup the namespaces + req := &structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) + + // Check we created the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) +} + +func TestNamespaceEndpoint_UpsertNamespaces_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + + // Create the policy and tokens + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + // Create the register request + req := &structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Upsert the namespace without a token and expect failure + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not create the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.Nil(out) + } + + // Try with an invalid token + req.AuthToken = invalidToken.SecretID + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not create the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.Nil(out) + } + + // Try with a root token + req.AuthToken = root.SecretID + { + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) + + // Check we created the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) + } +} diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 1ef24e401478..b1bc89c356c5 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1646,6 +1646,102 @@ func TestClientEndpoint_GetAllocs_ACL_Basic(t *testing.T) { } } +func TestClientEndpoint_GetAllocs_ACL_Namespaces(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + assert := assert.New(t) + + // Create the namespaces + ns1 := mock.Namespace() + ns2 := mock.Namespace() + ns1.Name = "altnamespace" + ns2.Name = "should-only-be-displayed-for-root-ns" + + // Create the allocs + allocDefaultNS := mock.Alloc() + allocAltNS := mock.Alloc() + allocAltNS.Namespace = ns1.Name + allocOtherNS := mock.Alloc() + allocOtherNS.Namespace = ns2.Name + + node := mock.Node() + allocDefaultNS.NodeID = node.ID + allocAltNS.NodeID = node.ID + allocOtherNS.NodeID = node.ID + state := s1.fsm.State() + assert.Nil(state.UpsertNamespaces(1, []*structs.Namespace{ns1, ns2}), "UpsertNamespaces") + assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 2, node), "UpsertNode") + assert.Nil(state.UpsertJobSummary(3, mock.JobSummary(allocDefaultNS.JobID)), "UpsertJobSummary") + assert.Nil(state.UpsertJobSummary(4, mock.JobSummary(allocAltNS.JobID)), "UpsertJobSummary") + assert.Nil(state.UpsertJobSummary(5, mock.JobSummary(allocOtherNS.JobID)), "UpsertJobSummary") + allocs := []*structs.Allocation{allocDefaultNS, allocAltNS, allocOtherNS} + assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 6, allocs), "UpsertAllocs") + + // Create the namespace policy and tokens + validDefaultToken := mock.CreatePolicyAndToken(t, state, 1001, "test-default-valid", mock.NodePolicy(acl.PolicyRead)+ + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + validNoNSToken := mock.CreatePolicyAndToken(t, state, 1003, "test-alt-valid", mock.NodePolicy(acl.PolicyRead)) + invalidToken := mock.CreatePolicyAndToken(t, state, 1004, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + // Lookup the node without a token and expect failure + req := &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + { + var resp structs.NodeAllocsResponse + err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp) + assert.NotNil(err, "RPC") + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a valid token for the default namespace + req.AuthToken = validDefaultToken.SecretID + { + var resp structs.NodeAllocsResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") + assert.Len(resp.Allocs, 1) + assert.Equal(allocDefaultNS.ID, resp.Allocs[0].ID) + } + + // Try with a valid token for a namespace with no allocs on this node + req.AuthToken = validNoNSToken.SecretID + { + var resp structs.NodeAllocsResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") + assert.Len(resp.Allocs, 0) + } + + // Try with a invalid token + req.AuthToken = invalidToken.SecretID + { + var resp structs.NodeAllocsResponse + err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp) + assert.NotNil(err, "RPC") + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a root token + req.AuthToken = root.SecretID + { + var resp structs.NodeAllocsResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") + assert.Len(resp.Allocs, 3) + for _, alloc := range resp.Allocs { + switch alloc.ID { + case allocDefaultNS.ID, allocAltNS.ID, allocOtherNS.ID: + // expected + default: + t.Errorf("unexpected alloc %q for namespace %q", alloc.ID, alloc.Namespace) + } + } + } +} + func TestClientEndpoint_GetClientAllocs(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/search_endpoint.go b/nomad/search_endpoint.go index 3e5eea504aba..478c73b84553 100644 --- a/nomad/search_endpoint.go +++ b/nomad/search_endpoint.go @@ -31,6 +31,7 @@ var ( structs.Deployments, structs.Plugins, structs.Volumes, + structs.Namespaces, } ) @@ -67,6 +68,8 @@ func (s *Search) getMatches(iter memdb.ResultIterator, prefix string) ([]string, id = t.ID case *structs.CSIVolume: id = t.ID + case *structs.Namespace: + id = t.Name default: matchID, ok := getEnterpriseMatch(raw) if !ok { @@ -105,11 +108,28 @@ func getResourceIter(context structs.Context, aclObj *acl.ACL, namespace, prefix return state.CSIPluginsByIDPrefix(ws, prefix) case structs.Volumes: return state.CSIVolumesByIDPrefix(ws, namespace, prefix) + case structs.Namespaces: + iter, err := state.NamespacesByNamePrefix(ws, prefix) + if err != nil { + return nil, err + } + if aclObj == nil { + return iter, nil + } + return memdb.NewFilterIterator(iter, namespaceFilter(aclObj)), nil default: return getEnterpriseResourceIter(context, aclObj, namespace, prefix, ws, state) } } +// namespaceFilter wraps a namespace iterator with a filter for removing +// namespaces the ACL can't access. +func namespaceFilter(aclObj *acl.ACL) memdb.FilterFunc { + return func(v interface{}) bool { + return !aclObj.AllowNamespace(v.(*structs.Namespace).Name) + } +} + // If the length of a prefix is odd, return a subset to the last even character // This only applies to UUIDs, jobs are excluded func roundUUIDDownIfOdd(prefix string, context structs.Context) string { diff --git a/nomad/search_endpoint_oss.go b/nomad/search_endpoint_oss.go index b4d80c63433e..e3f0fc0abcb8 100644 --- a/nomad/search_endpoint_oss.go +++ b/nomad/search_endpoint_oss.go @@ -43,6 +43,7 @@ func anySearchPerms(aclObj *acl.ACL, namespace string, context structs.Context) } nodeRead := aclObj.AllowNodeRead() + allowNS := aclObj.AllowNamespace(namespace) jobRead := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume, acl.NamespaceCapabilityCSIReadVolume, @@ -50,7 +51,7 @@ func anySearchPerms(aclObj *acl.ACL, namespace string, context structs.Context) acl.NamespaceCapabilityReadJob) volRead := allowVolume(aclObj, namespace) - if !nodeRead && !jobRead && !volRead { + if !nodeRead && !jobRead && !volRead && !allowNS { return false } @@ -60,6 +61,10 @@ func anySearchPerms(aclObj *acl.ACL, namespace string, context structs.Context) if !nodeRead && context == structs.Nodes { return false } + if !allowNS && context == structs.Namespaces { + return false + } + if !jobRead { switch context { case structs.Allocs, structs.Deployments, structs.Evals, structs.Jobs: @@ -106,6 +111,10 @@ func searchContexts(aclObj *acl.ACL, namespace string, context structs.Context) if jobRead { available = append(available, c) } + case structs.Namespaces: + if aclObj.AllowNamespace(namespace) { + available = append(available, c) + } case structs.Nodes: if aclObj.AllowNodeRead() { available = append(available, c) diff --git a/nomad/search_endpoint_test.go b/nomad/search_endpoint_test.go index 3d705179ee95..f9a0cdbd1f9c 100644 --- a/nomad/search_endpoint_test.go +++ b/nomad/search_endpoint_test.go @@ -823,3 +823,169 @@ func TestSearch_PrefixSearch_CSIVolume(t *testing.T) { assert.Equal(id, resp.Matches[structs.Volumes][0]) assert.Equal(resp.Truncations[structs.Volumes], false) } + +func TestSearch_PrefixSearch_Namespace(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s, cleanup := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + + defer cleanup() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + ns := mock.Namespace() + assert.Nil(s.fsm.State().UpsertNamespaces(2000, []*structs.Namespace{ns})) + + prefix := ns.Name[:len(ns.Name)-2] + + req := &structs.SearchRequest{ + Prefix: prefix, + Context: structs.Namespaces, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + + var resp structs.SearchResponse + if err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + assert.Equal(1, len(resp.Matches[structs.Namespaces])) + assert.Equal(ns.Name, resp.Matches[structs.Namespaces][0]) + assert.Equal(resp.Truncations[structs.Namespaces], false) + + assert.Equal(uint64(2000), resp.Index) +} + +func TestSearch_PrefixSearch_Namespace_ACL(t *testing.T) { + t.Parallel() + assert := assert.New(t) + s, root, cleanup := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanup() + + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + state := s.fsm.State() + + ns := mock.Namespace() + assert.Nil(state.UpsertNamespaces(500, []*structs.Namespace{ns})) + + job1 := mock.Job() + assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 502, job1)) + + job2 := mock.Job() + job2.Namespace = ns.Name + assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 504, job2)) + + assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1001, mock.Node())) + + req := &structs.SearchRequest{ + Prefix: "", + Context: structs.Jobs, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: job1.Namespace, + }, + } + + // Try without a token and expect failure + { + var resp structs.SearchResponse + err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with an invalid token and expect failure + { + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs})) + req.AuthToken = invalidToken.SecretID + var resp structs.SearchResponse + err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a node:read token and expect failure due to Namespaces being the context + { + validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-invalid2", mock.NodePolicy(acl.PolicyRead)) + req.Context = structs.Namespaces + req.AuthToken = validToken.SecretID + var resp structs.SearchResponse + err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a node:read token and expect success due to All context + { + validToken := mock.CreatePolicyAndToken(t, state, 1007, "test-valid", mock.NodePolicy(acl.PolicyRead)) + req.Context = structs.All + req.AuthToken = validToken.SecretID + var resp structs.SearchResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp)) + assert.Equal(uint64(1001), resp.Index) + assert.Len(resp.Matches[structs.Nodes], 1) + + // Jobs filtered out since token only has access to node:read + assert.Len(resp.Matches[structs.Jobs], 0) + } + + // Try with a valid token for non-default namespace:read-job + { + validToken := mock.CreatePolicyAndToken(t, state, 1009, "test-valid2", + mock.NamespacePolicy(job2.Namespace, "", []string{acl.NamespaceCapabilityReadJob})) + req.Context = structs.All + req.AuthToken = validToken.SecretID + req.Namespace = job2.Namespace + var resp structs.SearchResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp)) + assert.Len(resp.Matches[structs.Jobs], 1) + assert.Equal(job2.ID, resp.Matches[structs.Jobs][0]) + assert.Len(resp.Matches[structs.Namespaces], 1) + + // Index of job - not node - because node context is filtered out + assert.Equal(uint64(504), resp.Index) + + // Nodes filtered out since token only has access to namespace:read-job + assert.Len(resp.Matches[structs.Nodes], 0) + } + + // Try with a valid token for node:read and default namespace:read-job + { + validToken := mock.CreatePolicyAndToken(t, state, 1011, "test-valid3", strings.Join([]string{ + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), + mock.NodePolicy(acl.PolicyRead), + }, "\n")) + req.Context = structs.All + req.AuthToken = validToken.SecretID + req.Namespace = structs.DefaultNamespace + var resp structs.SearchResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp)) + assert.Len(resp.Matches[structs.Jobs], 1) + assert.Equal(job1.ID, resp.Matches[structs.Jobs][0]) + assert.Len(resp.Matches[structs.Nodes], 1) + assert.Equal(uint64(1001), resp.Index) + assert.Len(resp.Matches[structs.Namespaces], 1) + } + + // Try with a management token + { + req.Context = structs.All + req.AuthToken = root.SecretID + req.Namespace = structs.DefaultNamespace + var resp structs.SearchResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp)) + assert.Equal(uint64(1001), resp.Index) + assert.Len(resp.Matches[structs.Jobs], 1) + assert.Equal(job1.ID, resp.Matches[structs.Jobs][0]) + assert.Len(resp.Matches[structs.Nodes], 1) + assert.Len(resp.Matches[structs.Namespaces], 2) + } +} diff --git a/nomad/server.go b/nomad/server.go index fb744d5136ee..05735fe4ab39 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -276,6 +276,7 @@ type endpoints struct { Scaling *Scaling Enterprise *EnterpriseEndpoints Event *Event + Namespace *Namespace // Client endpoints ClientStats *ClientStats @@ -1149,6 +1150,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.Status = &Status{srv: s, logger: s.logger.Named("status")} s.staticEndpoints.System = &System{srv: s, logger: s.logger.Named("system")} s.staticEndpoints.Search = &Search{srv: s, logger: s.logger.Named("search")} + s.staticEndpoints.Namespace = &Namespace{srv: s} s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s) // Client endpoints @@ -1166,6 +1168,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.Event = &Event{srv: s} s.staticEndpoints.Event.register() + } // Register the static handlers @@ -1190,6 +1193,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { server.Register(s.staticEndpoints.ClientCSI) server.Register(s.staticEndpoints.FileSystem) server.Register(s.staticEndpoints.Agent) + server.Register(s.staticEndpoints.Namespace) // Create new dynamic endpoints and add them to the RPC server. node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")} diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 3d1308859ae9..00ea753e026a 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -9,6 +9,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + TableNamespaces = "namespaces" +) + var ( schemaFactories SchemaFactories factoriesLock sync.Mutex @@ -52,6 +56,7 @@ func init() { csiPluginTableSchema, scalingPolicyTableSchema, scalingEventTableSchema, + namespaceTableSchema, }...) } @@ -900,3 +905,28 @@ func scalingEventTableSchema() *memdb.TableSchema { }, } } + +// namespaceTableSchema returns the MemDB schema for the namespace table. +func namespaceTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: TableNamespaces, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Name", + }, + }, + "quota": { + Name: "quota", + AllowMissing: true, + Unique: false, + Indexer: &memdb.StringFieldIndex{ + Field: "Quota", + }, + }, + }, + } +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d5b2f1d3ffa9..97a5761d3b11 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -104,8 +104,8 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges) } - // Initialize the state store with required enterprise objects - if err := s.enterpriseInit(); err != nil { + // Initialize the state store with the default namespace. + if err := s.namespaceInit(); err != nil { return nil, fmt.Errorf("enterprise state store initialization failed: %v", err) } @@ -119,6 +119,25 @@ func (s *StateStore) EventBroker() (*stream.EventBroker, error) { return s.db.publisher, nil } +// namespaceInit ensures the default namespace exists. +func (s *StateStore) namespaceInit() error { + // Create the default namespace. This is safe to do every time we create the + // state store. There are two main cases, a brand new cluster in which case + // each server will have the same default namespace object, or a new cluster + // in which case if the default namespace has been modified, it will be + // overridden by the restore code path. + defaultNs := &structs.Namespace{ + Name: structs.DefaultNamespace, + Description: structs.DefaultNamespaceDescription, + } + + if err := s.UpsertNamespaces(1, []*structs.Namespace{defaultNs}); err != nil { + return fmt.Errorf("inserting default namespace failed: %v", err) + } + + return nil +} + // Config returns the state store configuration. func (s *StateStore) Config() *StateStoreConfig { return s.config @@ -5456,6 +5475,206 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s return nil } +// NamespaceByName is used to lookup a namespace by name +func (s *StateStore) NamespaceByName(ws memdb.WatchSet, name string) (*structs.Namespace, error) { + txn := s.db.ReadTxn() + return s.namespaceByNameImpl(ws, txn, name) +} + +// namespaceByNameImpl is used to lookup a namespace by name +func (s *StateStore) namespaceByNameImpl(ws memdb.WatchSet, txn *txn, name string) (*structs.Namespace, error) { + watchCh, existing, err := txn.FirstWatch(TableNamespaces, "id", name) + if err != nil { + return nil, fmt.Errorf("namespace lookup failed: %v", err) + } + ws.Add(watchCh) + + if existing != nil { + return existing.(*structs.Namespace), nil + } + return nil, nil +} + +// namespaceExists returns whether a namespace exists +func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { + if namespace == structs.DefaultNamespace { + return true, nil + } + + existing, err := txn.First(TableNamespaces, "id", namespace) + if err != nil { + return false, fmt.Errorf("namespace lookup failed: %v", err) + } + + return existing != nil, nil +} + +// NamespacesByNamePrefix is used to lookup namespaces by prefix +func (s *StateStore) NamespacesByNamePrefix(ws memdb.WatchSet, namePrefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := txn.Get(TableNamespaces, "id_prefix", namePrefix) + if err != nil { + return nil, fmt.Errorf("namespaces lookup failed: %v", err) + } + ws.Add(iter.WatchCh()) + + return iter, nil +} + +// Namespaces returns an iterator over all the namespaces +func (s *StateStore) Namespaces(ws memdb.WatchSet) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + // Walk the entire namespace table + iter, err := txn.Get(TableNamespaces, "id") + if err != nil { + return nil, err + } + ws.Add(iter.WatchCh()) + return iter, nil +} + +func (s *StateStore) NamespaceNames() ([]string, error) { + it, err := s.Namespaces(nil) + if err != nil { + return nil, err + } + + nses := []string{} + for { + next := it.Next() + if next == nil { + break + } + ns := next.(*structs.Namespace) + nses = append(nses, ns.Name) + } + + return nses, nil +} + +// UpsertNamespace is used to register or update a set of namespaces +func (s *StateStore) UpsertNamespaces(index uint64, namespaces []*structs.Namespace) error { + txn := s.db.WriteTxn(index) + defer txn.Abort() + + for _, ns := range namespaces { + if err := s.upsertNamespaceImpl(index, txn, ns); err != nil { + return err + } + } + + if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + +// upsertNamespaceImpl is used to upsert a namespace +func (s *StateStore) upsertNamespaceImpl(index uint64, txn *txn, namespace *structs.Namespace) error { + // Ensure the namespace hash is non-nil. This should be done outside the state store + // for performance reasons, but we check here for defense in depth. + ns := namespace + if len(ns.Hash) == 0 { + ns.SetHash() + } + + // Check if the namespace already exists + existing, err := txn.First(TableNamespaces, "id", ns.Name) + if err != nil { + return fmt.Errorf("namespace lookup failed: %v", err) + } + + // Setup the indexes correctly and determine which quotas need to be + // reconciled + var oldQuota string + if existing != nil { + exist := existing.(*structs.Namespace) + ns.CreateIndex = exist.CreateIndex + ns.ModifyIndex = index + + // Grab the old quota on the namespace + oldQuota = exist.Quota + } else { + ns.CreateIndex = index + ns.ModifyIndex = index + } + + // Validate that the quota on the new namespace exists + if ns.Quota != "" { + exists, err := s.quotaSpecExists(txn, ns.Quota) + if err != nil { + return fmt.Errorf("looking up namespace quota %q failed: %v", ns.Quota, err) + } else if !exists { + return fmt.Errorf("namespace %q using non-existent quota %q", ns.Name, ns.Quota) + } + } + + // Insert the namespace + if err := txn.Insert(TableNamespaces, ns); err != nil { + return fmt.Errorf("namespace insert failed: %v", err) + } + + // Reconcile changed quotas + return s.quotaReconcile(index, txn, ns.Quota, oldQuota) +} + +// DeleteNamespaces is used to remove a set of namespaces +func (s *StateStore) DeleteNamespaces(index uint64, names []string) error { + txn := s.db.WriteTxn(index) + defer txn.Abort() + + for _, name := range names { + // Lookup the namespace + existing, err := txn.First(TableNamespaces, "id", name) + if err != nil { + return fmt.Errorf("namespace lookup failed: %v", err) + } + if existing == nil { + return fmt.Errorf("namespace not found") + } + + ns := existing.(*structs.Namespace) + if ns.Name == structs.DefaultNamespace { + return fmt.Errorf("default namespace can not be deleted") + } + + // Ensure that the namespace doesn't have any non-terminal jobs + iter, err := s.jobsByNamespaceImpl(nil, name, txn) + if err != nil { + return err + } + + for { + raw := iter.Next() + if raw == nil { + break + } + job := raw.(*structs.Job) + + if job.Status != structs.JobStatusDead { + return fmt.Errorf("namespace %q contains at least one non-terminal job %q. "+ + "All jobs must be terminal in namespace before it can be deleted", name, job.ID) + } + } + + // Delete the namespace + if err := txn.Delete(TableNamespaces, existing); err != nil { + return fmt.Errorf("namespace deletion failed: %v", err) + } + } + + if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error { txn := s.db.WriteTxn(index) defer txn.Abort() @@ -5891,3 +6110,11 @@ func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) } return nil } + +// NamespaceRestore is used to restore a namespace +func (r *StateRestore) NamespaceRestore(ns *structs.Namespace) error { + if err := r.txn.Insert(TableNamespaces, ns); err != nil { + return fmt.Errorf("namespace insert failed: %v", err) + } + return nil +} diff --git a/nomad/state/state_store_oss.go b/nomad/state/state_store_oss.go index 487f8421378e..c161caf815e7 100644 --- a/nomad/state/state_store_oss.go +++ b/nomad/state/state_store_oss.go @@ -6,15 +6,13 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// enterpriseInit is used to initialize the state store with enterprise -// objects. -func (s *StateStore) enterpriseInit() error { - return nil +// quotaSpecExists on returns whether the quota exists +func (s *StateStore) quotaSpecExists(txn *txn, name string) (bool, error) { + return false, nil } -// namespaceExists returns whether a namespace exists -func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { - return namespace == structs.DefaultNamespace, nil +func (s *StateStore) quotaReconcile(index uint64, txn *txn, newQuota, oldQuota string) error { + return nil } // updateEntWithAlloc is used to update Nomad Enterprise objects when an allocation is @@ -22,7 +20,3 @@ func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error { return nil } - -func (s *StateStore) NamespaceNames() ([]string, error) { - return []string{structs.DefaultNamespace}, nil -} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 44f9b1f9c411..9a54bae3cbab 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -56,47 +56,51 @@ type MessageType uint8 // note: new raft message types need to be added to the end of this // list of contents const ( - NodeRegisterRequestType MessageType = iota - NodeDeregisterRequestType - NodeUpdateStatusRequestType - NodeUpdateDrainRequestType - JobRegisterRequestType - JobDeregisterRequestType - EvalUpdateRequestType - EvalDeleteRequestType - AllocUpdateRequestType - AllocClientUpdateRequestType - ReconcileJobSummariesRequestType - VaultAccessorRegisterRequestType - VaultAccessorDeregisterRequestType - ApplyPlanResultsRequestType - DeploymentStatusUpdateRequestType - DeploymentPromoteRequestType - DeploymentAllocHealthRequestType - DeploymentDeleteRequestType - JobStabilityRequestType - ACLPolicyUpsertRequestType - ACLPolicyDeleteRequestType - ACLTokenUpsertRequestType - ACLTokenDeleteRequestType - ACLTokenBootstrapRequestType - AutopilotRequestType - UpsertNodeEventsType - JobBatchDeregisterRequestType - AllocUpdateDesiredTransitionRequestType - NodeUpdateEligibilityRequestType - BatchNodeUpdateDrainRequestType - SchedulerConfigRequestType - NodeBatchDeregisterRequestType - ClusterMetadataRequestType - ServiceIdentityAccessorRegisterRequestType - ServiceIdentityAccessorDeregisterRequestType - CSIVolumeRegisterRequestType - CSIVolumeDeregisterRequestType - CSIVolumeClaimRequestType - ScalingEventRegisterRequestType - CSIVolumeClaimBatchRequestType - CSIPluginDeleteRequestType + NodeRegisterRequestType MessageType = 0 + NodeDeregisterRequestType MessageType = 1 + NodeUpdateStatusRequestType MessageType = 2 + NodeUpdateDrainRequestType MessageType = 3 + JobRegisterRequestType MessageType = 4 + JobDeregisterRequestType MessageType = 5 + EvalUpdateRequestType MessageType = 6 + EvalDeleteRequestType MessageType = 7 + AllocUpdateRequestType MessageType = 8 + AllocClientUpdateRequestType MessageType = 9 + ReconcileJobSummariesRequestType MessageType = 10 + VaultAccessorRegisterRequestType MessageType = 11 + VaultAccessorDeregisterRequestType MessageType = 12 + ApplyPlanResultsRequestType MessageType = 13 + DeploymentStatusUpdateRequestType MessageType = 14 + DeploymentPromoteRequestType MessageType = 15 + DeploymentAllocHealthRequestType MessageType = 16 + DeploymentDeleteRequestType MessageType = 17 + JobStabilityRequestType MessageType = 18 + ACLPolicyUpsertRequestType MessageType = 19 + ACLPolicyDeleteRequestType MessageType = 20 + ACLTokenUpsertRequestType MessageType = 21 + ACLTokenDeleteRequestType MessageType = 22 + ACLTokenBootstrapRequestType MessageType = 23 + AutopilotRequestType MessageType = 24 + UpsertNodeEventsType MessageType = 25 + JobBatchDeregisterRequestType MessageType = 26 + AllocUpdateDesiredTransitionRequestType MessageType = 27 + NodeUpdateEligibilityRequestType MessageType = 28 + BatchNodeUpdateDrainRequestType MessageType = 29 + SchedulerConfigRequestType MessageType = 30 + NodeBatchDeregisterRequestType MessageType = 31 + ClusterMetadataRequestType MessageType = 32 + ServiceIdentityAccessorRegisterRequestType MessageType = 33 + ServiceIdentityAccessorDeregisterRequestType MessageType = 34 + CSIVolumeRegisterRequestType MessageType = 35 + CSIVolumeDeregisterRequestType MessageType = 36 + CSIVolumeClaimRequestType MessageType = 37 + ScalingEventRegisterRequestType MessageType = 38 + CSIVolumeClaimBatchRequestType MessageType = 39 + CSIPluginDeleteRequestType MessageType = 40 + + // Namespace types were moved from enterprise and therefore start at 64 + NamespaceUpsertRequestType MessageType = 64 + NamespaceDeleteRequestType MessageType = 65 ) const ( @@ -148,6 +152,9 @@ const ( // to indicate that endpoints must search in all namespaces AllNamespacesSentinel = "*" + // maxNamespaceDescriptionLength limits a namespace description length + maxNamespaceDescriptionLength = 256 + // JitterFraction is a the limit to the amount of jitter we apply // to a user specified MaxQueryTime. We divide the specified time by // the fraction. So 16 == 6.25% limit of jitter. This jitter is also @@ -173,6 +180,11 @@ const ( DefaultBlockingRPCQueryTime = 300 * time.Second ) +var ( + // validNamespaceName is used to validate a namespace name + validNamespaceName = regexp.MustCompile("^[a-zA-Z0-9-]{1,128}$") +) + // Context defines the scope in which a search for Nomad object operates, and // is also used to query the matching index value for this context type Context string @@ -4698,6 +4710,119 @@ type MultiregionRegion struct { Meta map[string]string } +// Namespace allows logically grouping jobs and their associated objects. +type Namespace struct { + // Name is the name of the namespace + Name string + + // Description is a human readable description of the namespace + Description string + + // Quota is the quota specification that the namespace should account + // against. + Quota string + + // Hash is the hash of the namespace which is used to efficiently replicate + // cross-regions. + Hash []byte + + // Raft Indexes + CreateIndex uint64 + ModifyIndex uint64 +} + +func (n *Namespace) Validate() error { + var mErr multierror.Error + + // Validate the name and description + if !validNamespaceName.MatchString(n.Name) { + err := fmt.Errorf("invalid name %q. Must match regex %s", n.Name, validNamespaceName) + mErr.Errors = append(mErr.Errors, err) + } + if len(n.Description) > maxNamespaceDescriptionLength { + err := fmt.Errorf("description longer than %d", maxNamespaceDescriptionLength) + mErr.Errors = append(mErr.Errors, err) + } + + return mErr.ErrorOrNil() +} + +// SetHash is used to compute and set the hash of the namespace +func (n *Namespace) SetHash() []byte { + // Initialize a 256bit Blake2 hash (32 bytes) + hash, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + + // Write all the user set fields + hash.Write([]byte(n.Name)) + hash.Write([]byte(n.Description)) + hash.Write([]byte(n.Quota)) + + // Finalize the hash + hashVal := hash.Sum(nil) + + // Set and return the hash + n.Hash = hashVal + return hashVal +} + +func (n *Namespace) Copy() *Namespace { + nc := new(Namespace) + *nc = *n + nc.Hash = make([]byte, len(n.Hash)) + copy(nc.Hash, n.Hash) + return nc +} + +// NamespaceListRequest is used to request a list of namespaces +type NamespaceListRequest struct { + QueryOptions +} + +// NamespaceListResponse is used for a list request +type NamespaceListResponse struct { + Namespaces []*Namespace + QueryMeta +} + +// NamespaceSpecificRequest is used to query a specific namespace +type NamespaceSpecificRequest struct { + Name string + QueryOptions +} + +// SingleNamespaceResponse is used to return a single namespace +type SingleNamespaceResponse struct { + Namespace *Namespace + QueryMeta +} + +// NamespaceSetRequest is used to query a set of namespaces +type NamespaceSetRequest struct { + Namespaces []string + QueryOptions +} + +// NamespaceSetResponse is used to return a set of namespaces +type NamespaceSetResponse struct { + Namespaces map[string]*Namespace // Keyed by namespace Name + QueryMeta +} + +// NamespaceDeleteRequest is used to delete a set of namespaces +type NamespaceDeleteRequest struct { + Namespaces []string + WriteRequest +} + +// NamespaceUpsertRequest is used to upsert a set of namespaces +type NamespaceUpsertRequest struct { + Namespaces []*Namespace + WriteRequest +} + const ( // PeriodicSpecCron is used for a cron spec. PeriodicSpecCron = "cron" diff --git a/website/pages/api-docs/index.mdx b/website/pages/api-docs/index.mdx index 04ef968c98b7..f3b306205067 100644 --- a/website/pages/api-docs/index.mdx +++ b/website/pages/api-docs/index.mdx @@ -92,7 +92,10 @@ $ curl \ ## Namespaces -Nomad Enterprise has support for namespaces, which allow jobs and their associated objects to be segmented from each other and other users of the cluster. When using non-default namespace, the API request must pass the target namespace as an API query parameter. +Nomad has support for namespaces, which allow jobs and their associated objects +to be segmented from each other and other users of the cluster. When using +non-default namespace, the API request must pass the target namespace as an API +query parameter. Prior to Nomad 1.0 namespaces were Enterprise-only. Here is an example using curl: diff --git a/website/pages/api-docs/json-jobs.mdx b/website/pages/api-docs/json-jobs.mdx index 36d454696db6..0f639106b5c6 100644 --- a/website/pages/api-docs/json-jobs.mdx +++ b/website/pages/api-docs/json-jobs.mdx @@ -185,7 +185,7 @@ The `Job` object supports the following keys: - `Meta` - Annotates the job with opaque metadata. - `Namespace` - The namespace to execute the job in, defaults to "default". - Values other than default are not allowed in non-Enterprise versions of Nomad. + Prior to Nomad 1.0 namespaces were Enterprise-only. - `ParameterizedJob` - Specifies the job as a parameterized job such that it can be dispatched against. The `ParameterizedJob` object supports the following diff --git a/website/pages/api-docs/namespaces.mdx b/website/pages/api-docs/namespaces.mdx index 8173ddb14c03..91648f742b1c 100644 --- a/website/pages/api-docs/namespaces.mdx +++ b/website/pages/api-docs/namespaces.mdx @@ -9,8 +9,7 @@ description: The /namespace endpoints are used to query for and interact with na The `/namespace` endpoints are used to query for and interact with namespaces. -~> **Enterprise Only!** This API endpoint and functionality only exists in -Nomad Enterprise. This is not present in the open source version of Nomad. +~> Prior to Nomad 1.0 Namespaces were an Enterprise-only feature. ## List Namespaces diff --git a/website/pages/api-docs/search.mdx b/website/pages/api-docs/search.mdx index cac8ff382176..d739b400e94c 100644 --- a/website/pages/api-docs/search.mdx +++ b/website/pages/api-docs/search.mdx @@ -9,8 +9,9 @@ description: The /search endpoint is used to search for Nomad objects The `/search` endpoint returns matches for a given prefix and context, where a context can be jobs, allocations, evaluations, nodes, deployments, plugins, -or volumes. When using Nomad Enterprise, the allowed contexts include quotas -and namespaces. Additionally, a prefix can be searched for within every context. +namespaces, or volumes. When using Nomad Enterprise, the allowed contexts +include quotas. Additionally, a prefix can be searched for within every +context. | Method | Path | Produces | | ------ | ------------ | ------------------ | diff --git a/website/pages/docs/commands/namespace/apply.mdx b/website/pages/docs/commands/namespace/apply.mdx index e3417386b871..849c4572bbc5 100644 --- a/website/pages/docs/commands/namespace/apply.mdx +++ b/website/pages/docs/commands/namespace/apply.mdx @@ -10,8 +10,8 @@ description: | The `namespace apply` command is used create or update a namespace. -~> Namespace commands are new in Nomad 0.7 and are only available with Nomad -Enterprise. +~> Namespaces are open source in Nomad 1.0. Namespaces were Enterprise-only + when introduced in Nomad 0.7. ## Usage diff --git a/website/pages/docs/commands/namespace/delete.mdx b/website/pages/docs/commands/namespace/delete.mdx index d25485ecdf62..66ee82b3fd98 100644 --- a/website/pages/docs/commands/namespace/delete.mdx +++ b/website/pages/docs/commands/namespace/delete.mdx @@ -10,8 +10,8 @@ description: | The `namespace delete` command is used delete a namespace. -~> Namespace commands are new in Nomad 0.7 and are only available with Nomad -Enterprise. +~> Namespaces are open source in Nomad 1.0. Namespaces were Enterprise-only + when introduced in Nomad 0.7. ## Usage diff --git a/website/pages/docs/commands/namespace/index.mdx b/website/pages/docs/commands/namespace/index.mdx index 7a8b064ae677..1e0d5ebb19fd 100644 --- a/website/pages/docs/commands/namespace/index.mdx +++ b/website/pages/docs/commands/namespace/index.mdx @@ -10,8 +10,8 @@ description: | The `namespace` command is used to interact with namespaces. -~> Namespace commands are new in Nomad 0.7 and are only available with Nomad -Enterprise. +~> Namespaces are open source in Nomad 1.0. Namespaces were Enterprise-only + when introduced in Nomad 0.7. ## Usage diff --git a/website/pages/docs/commands/namespace/inspect.mdx b/website/pages/docs/commands/namespace/inspect.mdx index b85906184826..1b242816ad03 100644 --- a/website/pages/docs/commands/namespace/inspect.mdx +++ b/website/pages/docs/commands/namespace/inspect.mdx @@ -12,8 +12,8 @@ description: > The `namespace inspect` command is used to view raw information about a particular namespace. -~> Namespace commands are new in Nomad 0.7 and are only available with Nomad -Enterprise. +~> Namespaces are open source in Nomad 1.0. Namespaces were Enterprise-only + when introduced in Nomad 0.7. ## Usage diff --git a/website/pages/docs/commands/namespace/list.mdx b/website/pages/docs/commands/namespace/list.mdx index dc03ce9a0b64..8e3fbc2244c8 100644 --- a/website/pages/docs/commands/namespace/list.mdx +++ b/website/pages/docs/commands/namespace/list.mdx @@ -10,8 +10,8 @@ description: | The `namespace list` command is used list available namespaces. -~> Namespace commands are new in Nomad 0.7 and are only available with Nomad -Enterprise. +~> Namespaces are open source in Nomad 1.0. Namespaces were Enterprise-only + when introduced in Nomad 0.7. ## Usage diff --git a/website/pages/docs/commands/namespace/status.mdx b/website/pages/docs/commands/namespace/status.mdx index 3ee41fbefe04..ef3857719ea2 100644 --- a/website/pages/docs/commands/namespace/status.mdx +++ b/website/pages/docs/commands/namespace/status.mdx @@ -12,8 +12,8 @@ description: > The `namespace status` command is used to view the status of a particular namespace. -~> Namespace commands are new in Nomad 0.7 and are only available with Nomad -Enterprise. +~> Namespaces are open source in Nomad 1.0. Namespaces were Enterprise-only + when introduced in Nomad 0.7. ## Usage diff --git a/website/pages/docs/enterprise.mdx b/website/pages/docs/enterprise.mdx index 776d153f0445..3472fa5b9b0a 100644 --- a/website/pages/docs/enterprise.mdx +++ b/website/pages/docs/enterprise.mdx @@ -6,8 +6,7 @@ description: >- Nomad Enterprise adds operations, collaboration, and governance capabilities to Nomad. - Features include Namespaces, Resource Quotas, Sentinel Policies, and Advanced - Autopilot. + Features include Resource Quotas, Sentinel Policies, and Advanced Autopilot. --- # Nomad Enterprise @@ -62,7 +61,7 @@ See the [Vault Integration documentation](/docs/integrations/vault-integration#e ## Governance & Policy -Governance & Policy features are part of an add-on module that enables an organization to securely operate Nomad at scale across multiple teams through features such as Audit Logging, Namespaces, Resource Quotas, Sentinel Policies, and Cross-Namespace Queries. +Governance & Policy features are part of an add-on module that enables an organization to securely operate Nomad at scale across multiple teams through features such as Audit Logging, Resource Quotas, and Sentinel Policies. ### Audit Logging @@ -72,14 +71,6 @@ With Audit Logging, enterprises can now proactively identify access anomalies, e See the [Audit Logging Documentation](/docs/configuration/audit) for a thorough overview. -### Namespaces - -Namespaces enable multiple teams to safely use a shared multi-region Nomad environment and reduce cluster fleet size. In Nomad Enterprise, a shared cluster can be partitioned into multiple namespaces which allow jobs and their associated objects to be isolated from each other and other users of the cluster. - -Namespaces enhance the usability of a shared cluster by isolating teams from the jobs of others, by providing fine grain access control to jobs when coupled with ACLs, and by preventing bad actors from negatively impacting the whole cluster. - -See the [Namespaces Guide](https://learn.hashicorp.com/tutorials/nomad/namespaces) for a thorough overview. - ### Resource Quotas Resource Quotas enable an operator to limit resource consumption across teams or projects to reduce waste and align budgets. In Nomad Enterprise, operators can define quota specifications and apply them to namespaces. When a quota is attached to a namespace, the jobs within the namespace may not consume more resources than the quota specification allows. @@ -94,12 +85,6 @@ In Nomad Enterprise, operators can create Sentinel policies for fine-grained pol See the [Sentinel Policies Guide](https://learn.hashicorp.com/tutorials/nomad/sentinel) for a thorough overview. -### Cross-Namespace Queries - -Cross-Namespace Queries allows operators to query jobs and allocations across all namespaces for faster operator debugging and visibility in multi-tenant clusters. This enterprise feature accelerates and simplifies the debugging process by allowing operators to easily locate faulty jobs and allocations from developers. - -See the individual Nomad CLI command documentation (e.g., [alloc exec](/docs/commands/alloc/exec#namespace)) for more information. - ## Multi-Cluster & Efficiency Multi-Cluster & Efficiency features are part of an add-on module that enables diff --git a/website/pages/docs/job-specification/job.mdx b/website/pages/docs/job-specification/job.mdx index 2e2ebde2dc7a..c12b83320dbc 100644 --- a/website/pages/docs/job-specification/job.mdx +++ b/website/pages/docs/job-specification/job.mdx @@ -15,8 +15,7 @@ description: |- The `job` stanza is the top-most configuration option in the job specification. A job is a declarative specification of tasks that Nomad should run. Jobs have one or more task groups, which are themselves collections of one or more tasks. -Job names are unique per [region][region] or [namespace][namespace] (if Nomad -Enterprise is used). +Job names are unique per [region][region] or [namespace][namespace]. ```hcl job "docs" { @@ -96,7 +95,7 @@ job "docs" { applied. Only service jobs with a count greater than 1 support migrate stanzas. - `namespace` `(string: "default")` - The namespace in which to execute the job. - Values other than default are not allowed in non-Enterprise versions of Nomad. + Prior to Nomad 1.0 namespaces were Enterprise-only. - `parameterized` ([Parameterized][parameterized]: nil) - Specifies the job as a parameterized job such that it can be dispatched against.