diff --git a/command/agent/http.go b/command/agent/http.go index 582cae0a2de1..6dee1a4f58f1 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -328,6 +328,10 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream)) + s.mux.HandleFunc("/v1/namespaces", s.wrap(s.NamespacesRequest)) + s.mux.HandleFunc("/v1/namespace", s.wrap(s.NamespaceCreateRequest)) + s.mux.HandleFunc("/v1/namespace/", s.wrap(s.NamespaceSpecificRequest)) + if uiEnabled { s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()})))) } else { diff --git a/command/agent/http_oss.go b/command/agent/http_oss.go index 61de2934742f..49056510b1b1 100644 --- a/command/agent/http_oss.go +++ b/command/agent/http_oss.go @@ -8,10 +8,6 @@ import ( // registerEnterpriseHandlers is a no-op for the oss release func (s *HTTPServer) registerEnterpriseHandlers() { - s.mux.HandleFunc("/v1/namespaces", s.wrap(s.entOnly)) - s.mux.HandleFunc("/v1/namespace", s.wrap(s.entOnly)) - s.mux.HandleFunc("/v1/namespace/", s.wrap(s.entOnly)) - s.mux.HandleFunc("/v1/sentinel/policies", s.wrap(s.entOnly)) s.mux.HandleFunc("/v1/sentinel/policy/", s.wrap(s.entOnly)) diff --git a/command/agent/namespace_endpoint.go b/command/agent/namespace_endpoint.go new file mode 100644 index 000000000000..64f9a20ab0a8 --- /dev/null +++ b/command/agent/namespace_endpoint.go @@ -0,0 +1,119 @@ +package agent + +import ( + "net/http" + "strings" + + "github.com/hashicorp/nomad/nomad/structs" +) + +func (s *HTTPServer) NamespacesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "GET" { + return nil, CodedError(405, ErrInvalidMethod) + } + + args := structs.NamespaceListRequest{} + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.NamespaceListResponse + if err := s.agent.RPC("Namespace.ListNamespaces", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.Namespaces == nil { + out.Namespaces = make([]*structs.Namespace, 0) + } + return out.Namespaces, nil +} + +func (s *HTTPServer) NamespaceSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + name := strings.TrimPrefix(req.URL.Path, "/v1/namespace/") + if len(name) == 0 { + return nil, CodedError(400, "Missing Namespace Name") + } + switch req.Method { + case "GET": + return s.namespaceQuery(resp, req, name) + case "PUT", "POST": + return s.namespaceUpdate(resp, req, name) + case "DELETE": + return s.namespaceDelete(resp, req, name) + default: + return nil, CodedError(405, ErrInvalidMethod) + } +} + +func (s *HTTPServer) NamespaceCreateRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "PUT" && req.Method != "POST" { + return nil, CodedError(405, ErrInvalidMethod) + } + + return s.namespaceUpdate(resp, req, "") +} + +func (s *HTTPServer) namespaceQuery(resp http.ResponseWriter, req *http.Request, + namespaceName string) (interface{}, error) { + args := structs.NamespaceSpecificRequest{ + Name: namespaceName, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.SingleNamespaceResponse + if err := s.agent.RPC("Namespace.GetNamespace", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.Namespace == nil { + return nil, CodedError(404, "Namespace not found") + } + return out.Namespace, nil +} + +func (s *HTTPServer) namespaceUpdate(resp http.ResponseWriter, req *http.Request, + namespaceName string) (interface{}, error) { + // Parse the namespace + var namespace structs.Namespace + if err := decodeBody(req, &namespace); err != nil { + return nil, CodedError(500, err.Error()) + } + + // Ensure the namespace name matches + if namespaceName != "" && namespace.Name != namespaceName { + return nil, CodedError(400, "Namespace name does not match request path") + } + + // Format the request + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{&namespace}, + } + s.parseWriteRequest(req, &args.WriteRequest) + + var out structs.GenericResponse + if err := s.agent.RPC("Namespace.UpsertNamespaces", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return nil, nil +} + +func (s *HTTPServer) namespaceDelete(resp http.ResponseWriter, req *http.Request, + namespaceName string) (interface{}, error) { + + args := structs.NamespaceDeleteRequest{ + Namespaces: []string{namespaceName}, + } + s.parseWriteRequest(req, &args.WriteRequest) + + var out structs.GenericResponse + if err := s.agent.RPC("Namespace.DeleteNamespaces", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return nil, nil +} diff --git a/command/agent/namespace_endpoint_test.go b/command/agent/namespace_endpoint_test.go new file mode 100644 index 000000000000..9e9a0fd01326 --- /dev/null +++ b/command/agent/namespace_endpoint_test.go @@ -0,0 +1,172 @@ +// +build ent + +package agent + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" +) + +func TestHTTP_NamespaceList(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + ns1 := mock.Namespace() + ns2 := mock.Namespace() + ns3 := mock.Namespace() + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2, ns3}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(s.Agent.RPC("Namespace.UpsertNamespaces", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/namespaces", nil) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespacesRequest(respW, req) + assert.Nil(err) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + assert.Equal("true", respW.HeaderMap.Get("X-Nomad-KnownLeader")) + assert.NotZero(respW.HeaderMap.Get("X-Nomad-LastContact")) + + // Check the output (the 3 we register + default) + assert.Len(obj.([]*structs.Namespace), 4) + }) +} + +func TestHTTP_NamespaceQuery(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + ns1 := mock.Namespace() + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(s.Agent.RPC("Namespace.UpsertNamespaces", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/namespace/"+ns1.Name, nil) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceSpecificRequest(respW, req) + assert.Nil(err) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + assert.Equal("true", respW.HeaderMap.Get("X-Nomad-KnownLeader")) + assert.NotZero(respW.HeaderMap.Get("X-Nomad-LastContact")) + + // Check the output + assert.Equal(ns1.Name, obj.(*structs.Namespace).Name) + }) +} + +func TestHTTP_NamespaceCreate(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Make the HTTP request + ns1 := mock.Namespace() + buf := encodeReq(ns1) + req, err := http.NewRequest("PUT", "/v1/namespace", buf) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceCreateRequest(respW, req) + assert.Nil(err) + assert.Nil(obj) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check policy was created + state := s.Agent.server.State() + out, err := state.NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + ns1.CreateIndex, ns1.ModifyIndex = out.CreateIndex, out.ModifyIndex + assert.Equal(ns1.Name, out.Name) + assert.Equal(ns1, out) + }) +} + +func TestHTTP_NamespaceUpdate(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Make the HTTP request + ns1 := mock.Namespace() + buf := encodeReq(ns1) + req, err := http.NewRequest("PUT", "/v1/namespace/"+ns1.Name, buf) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceSpecificRequest(respW, req) + assert.Nil(err) + assert.Nil(obj) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check policy was created + state := s.Agent.server.State() + out, err := state.NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + ns1.CreateIndex, ns1.ModifyIndex = out.CreateIndex, out.ModifyIndex + assert.Equal(ns1.Name, out.Name) + assert.Equal(ns1, out) + }) +} + +func TestHTTP_NamespaceDelete(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + ns1 := mock.Namespace() + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(s.Agent.RPC("Namespace.UpsertNamespaces", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("DELETE", "/v1/namespace/"+ns1.Name, nil) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceSpecificRequest(respW, req) + assert.Nil(err) + assert.Nil(obj) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check policy was created + state := s.Agent.server.State() + out, err := state.NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + }) +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 063269a2b7e3..f1d7f0f20cd5 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -32,26 +32,29 @@ const ( type SnapshotType byte const ( - NodeSnapshot SnapshotType = iota - JobSnapshot - IndexSnapshot - EvalSnapshot - AllocSnapshot - TimeTableSnapshot - PeriodicLaunchSnapshot - JobSummarySnapshot - VaultAccessorSnapshot - JobVersionSnapshot - DeploymentSnapshot - ACLPolicySnapshot - ACLTokenSnapshot - SchedulerConfigSnapshot - ClusterMetadataSnapshot - ServiceIdentityTokenAccessorSnapshot - ScalingPolicySnapshot - CSIPluginSnapshot - CSIVolumeSnapshot - ScalingEventsSnapshot + NodeSnapshot SnapshotType = 0 + JobSnapshot SnapshotType = 1 + IndexSnapshot SnapshotType = 2 + EvalSnapshot SnapshotType = 3 + AllocSnapshot SnapshotType = 4 + TimeTableSnapshot SnapshotType = 5 + PeriodicLaunchSnapshot SnapshotType = 6 + JobSummarySnapshot SnapshotType = 7 + VaultAccessorSnapshot SnapshotType = 8 + JobVersionSnapshot SnapshotType = 9 + DeploymentSnapshot SnapshotType = 10 + ACLPolicySnapshot SnapshotType = 11 + ACLTokenSnapshot SnapshotType = 12 + SchedulerConfigSnapshot SnapshotType = 13 + ClusterMetadataSnapshot SnapshotType = 14 + ServiceIdentityTokenAccessorSnapshot SnapshotType = 15 + ScalingPolicySnapshot SnapshotType = 16 + CSIPluginSnapshot SnapshotType = 17 + CSIVolumeSnapshot SnapshotType = 18 + ScalingEventsSnapshot SnapshotType = 19 + + // Namespace appliers were moved from enterprise and therefore start at 64 + NamespaceSnapshot SnapshotType = 64 ) // LogApplier is the definition of a function that can apply a Raft log @@ -286,6 +289,10 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyCSIVolumeBatchClaim(buf[1:], log.Index) case structs.CSIPluginDeleteRequestType: return n.applyCSIPluginDelete(buf[1:], log.Index) + case structs.NamespaceUpsertRequestType: + return n.applyNamespaceUpsert(buf[1:], log.Index) + case structs.NamespaceDeleteRequestType: + return n.applyNamespaceDelete(buf[1:], log.Index) } // Check enterprise only message types. @@ -1248,6 +1255,58 @@ func (n *nomadFSM) applyCSIPluginDelete(buf []byte, index uint64) interface{} { return nil } +// applyNamespaceUpsert is used to upsert a set of namespaces +func (n *nomadFSM) applyNamespaceUpsert(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_namespace_upsert"}, time.Now()) + var req structs.NamespaceUpsertRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + var trigger []string + for _, ns := range req.Namespaces { + old, err := n.state.NamespaceByName(nil, ns.Name) + if err != nil { + n.logger.Error("namespace lookup failed", "error", err) + return err + } + + // If we are changing the quota on a namespace trigger evals for the + // older quota. + if old != nil && old.Quota != "" && old.Quota != ns.Quota { + trigger = append(trigger, old.Quota) + } + } + + if err := n.state.UpsertNamespaces(index, req.Namespaces); err != nil { + n.logger.Error("UpsertNamespaces failed", "error", err) + return err + } + + // Send the unblocks + for _, quota := range trigger { + n.blockedEvals.UnblockQuota(quota, index) + } + + return nil +} + +// applyNamespaceDelete is used to delete a set of namespaces +func (n *nomadFSM) applyNamespaceDelete(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_namespace_delete"}, time.Now()) + var req structs.NamespaceDeleteRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeleteNamespaces(index, req.Namespaces); err != nil { + n.logger.Error("DeleteNamespaces failed", "error", err) + return err + } + + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() @@ -1514,6 +1573,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := restore.CSIVolumeRestore(plugin); err != nil { return err } + + case NamespaceSnapshot: + namespace := new(structs.Namespace) + if err := dec.Decode(namespace); err != nil { + return err + } + if err := restore.NamespaceRestore(namespace); err != nil { + return err + } + default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -1816,6 +1885,9 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistNamespaces(sink, encoder); err != nil { + return err + } if err := s.persistEnterpriseTables(sink, encoder); err != nil { sink.Cancel() return err @@ -2177,6 +2249,34 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink, return nil } +// persistNamespaces persists all the namespaces. +func (s *nomadSnapshot) persistNamespaces(sink raft.SnapshotSink, encoder *codec.Encoder) error { + // Get all the jobs + ws := memdb.NewWatchSet() + namespaces, err := s.snap.Namespaces(ws) + if err != nil { + return err + } + + for { + // Get the next item + raw := namespaces.Next() + if raw == nil { + break + } + + // Prepare the request struct + namespace := raw.(*structs.Namespace) + + // Write out a namespace registration + sink.Write([]byte{byte(NamespaceSnapshot)}) + if err := encoder.Encode(namespace); err != nil { + return err + } + } + return nil +} + func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get scheduler config diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index f2f49126a079..f83824fa36fa 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3201,3 +3201,104 @@ func TestFSM_ClusterMetadata(t *testing.T) { r.Equal(clusterID, storedMetadata.ClusterID) r.Equal(now, storedMetadata.CreateTime) } + +func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + // Enable event publisher with durable event count of zero + fsm.config.EnableEventBroker = true + fsm.config.DurableEventCount = 0 + + state := fsm.State() + + e1 := mock.Events(1000) + e2 := mock.Events(1001) + + require.NoError(t, state.UpsertEvents(1000, e1)) + require.NoError(t, state.UpsertEvents(1001, e2)) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + // ws := memdb.NewWatchSet() + out, err := state2.LatestEventsReverse(nil) + require.NoError(t, err) + + raw := out.Next() + require.Nil(t, raw) +} + +func TestFSM_UpsertNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + fsm := testFSM(t) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + req := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2}, + } + buf, err := structs.Encode(structs.NamespaceUpsertRequestType, req) + assert.Nil(err) + assert.Nil(fsm.Apply(makeLog(buf))) + + // Verify we are registered + ws := memdb.NewWatchSet() + out, err := fsm.State().NamespaceByName(ws, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = fsm.State().NamespaceByName(ws, ns2.Name) + assert.Nil(err) + assert.NotNil(out) +} + +func TestFSM_DeleteNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + fsm := testFSM(t) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + assert.Nil(fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2})) + + req := structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + } + buf, err := structs.Encode(structs.NamespaceDeleteRequestType, req) + assert.Nil(err) + assert.Nil(fsm.Apply(makeLog(buf))) + + // Verify we are NOT registered + ws := memdb.NewWatchSet() + out, err := fsm.State().NamespaceByName(ws, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = fsm.State().NamespaceByName(ws, ns2.Name) + assert.Nil(err) + assert.Nil(out) +} + +func TestFSM_SnapshotRestore_Namespaces(t *testing.T) { + t.Parallel() + // Add some state + fsm := testFSM(t) + state := fsm.State() + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state.UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + ws := memdb.NewWatchSet() + out1, _ := state2.NamespaceByName(ws, ns1.Name) + out2, _ := state2.NamespaceByName(ws, ns2.Name) + if !reflect.DeepEqual(ns1, out1) { + t.Fatalf("bad: \n%#v\n%#v", out1, ns1) + } + if !reflect.DeepEqual(ns2, out2) { + t.Fatalf("bad: \n%#v\n%#v", out2, ns2) + } +} diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index dc9e859b7811..6202a3ebffe9 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -12,6 +12,7 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/raft" "github.com/kr/pretty" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/acl" @@ -2130,6 +2131,83 @@ func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluat return nil } +func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) { + t.Parallel() + s1, _, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Policy with read on default namespace and write on non default + policy := &structs.ACLPolicy{ + Name: fmt.Sprintf("policy-%s", uuid.Generate()), + Description: "Super cool policy!", + Rules: ` + namespace "default" { + policy = "read" + } + namespace "test" { + policy = "write" + } + node { + policy = "read" + } + agent { + policy = "read" + } + `, + CreateIndex: 10, + ModifyIndex: 20, + } + policy.SetHash() + + assert := assert.New(t) + + // Upsert policy and token + token := mock.ACLToken() + token.Policies = []string{policy.Name} + err := s1.State().UpsertACLPolicies(100, []*structs.ACLPolicy{policy}) + assert.Nil(err) + + err = s1.State().UpsertACLTokens(110, []*structs.ACLToken{token}) + assert.Nil(err) + + // Upsert namespace + ns := mock.Namespace() + ns.Name = "test" + err = s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns}) + assert.Nil(err) + + // Create the register request + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + req.AuthToken = token.SecretID + // Use token without write access to default namespace, expect failure + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + assert.NotNil(err, "expected permission denied") + + req.Namespace = "test" + job.Namespace = "test" + + // Use token with write access to default namespace, expect success + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + assert.Nil(err, "unexpected err: %v", err) + assert.NotEqual(resp.Index, 0, "bad index: %d", resp.Index) + + // Check for the node in the FSM + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + assert.Nil(err) + assert.NotNil(out, "expected job") +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -3478,8 +3556,9 @@ func TestJobEndpoint_Deregister_EvalCreation_Modern(t *testing.T) { }) } -// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job deregister creates an eval -// atomically with the registration, but handle legacy clients by adding a new eval update +// TestJobEndpoint_Deregister_EvalCreation_Legacy asserts that job deregister +// creates an eval atomically with the registration, but handle legacy clients +// by adding a new eval update func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { t.Parallel() diff --git a/nomad/leader.go b/nomad/leader.go index 8576219ddcac..1ec7bb5e388a 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -325,6 +325,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { if s.config.ACLEnabled && s.config.Region != s.config.AuthoritativeRegion { go s.replicateACLPolicies(stopCh) go s.replicateACLTokens(stopCh) + go s.replicateNamespaces(stopCh) } // Setup any enterprise systems required. @@ -345,6 +346,146 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return nil } +// replicateNamespaces is used to replicate namespaces from the authoritative +// region to this region. +func (s *Server) replicateNamespaces(stopCh chan struct{}) { + req := structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{ + Region: s.config.AuthoritativeRegion, + AllowStale: true, + }, + } + limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit)) + s.logger.Debug("starting namespace replication from authoritative region", "region", req.Region) + +START: + for { + select { + case <-stopCh: + return + default: + } + + // Rate limit how often we attempt replication + limiter.Wait(context.Background()) + + // Fetch the list of namespaces + var resp structs.NamespaceListResponse + req.AuthToken = s.ReplicationToken() + err := s.forwardRegion(s.config.AuthoritativeRegion, "Namespace.ListNamespaces", &req, &resp) + if err != nil { + s.logger.Error("failed to fetch namespaces from authoritative region", "error", err) + goto ERR_WAIT + } + + // Perform a two-way diff + delete, update := diffNamespaces(s.State(), req.MinQueryIndex, resp.Namespaces) + + // Delete namespaces that should not exist + if len(delete) > 0 { + args := &structs.NamespaceDeleteRequest{ + Namespaces: delete, + } + _, _, err := s.raftApply(structs.NamespaceDeleteRequestType, args) + if err != nil { + s.logger.Error("failed to delete namespaces", "error", err) + goto ERR_WAIT + } + } + + // Fetch any outdated namespaces + var fetched []*structs.Namespace + if len(update) > 0 { + req := structs.NamespaceSetRequest{ + Namespaces: update, + QueryOptions: structs.QueryOptions{ + Region: s.config.AuthoritativeRegion, + AuthToken: s.ReplicationToken(), + AllowStale: true, + MinQueryIndex: resp.Index - 1, + }, + } + var reply structs.NamespaceSetResponse + if err := s.forwardRegion(s.config.AuthoritativeRegion, "Namespace.GetNamespaces", &req, &reply); err != nil { + s.logger.Error("failed to fetch namespaces from authoritative region", "error", err) + goto ERR_WAIT + } + for _, namespace := range reply.Namespaces { + fetched = append(fetched, namespace) + } + } + + // Update local namespaces + if len(fetched) > 0 { + args := &structs.NamespaceUpsertRequest{ + Namespaces: fetched, + } + _, _, err := s.raftApply(structs.NamespaceUpsertRequestType, args) + if err != nil { + s.logger.Error("failed to update namespaces", "error", err) + goto ERR_WAIT + } + } + + // Update the minimum query index, blocks until there is a change. + req.MinQueryIndex = resp.Index + } + +ERR_WAIT: + select { + case <-time.After(s.config.ReplicationBackoff): + goto START + case <-stopCh: + return + } +} + +// diffNamespaces is used to perform a two-way diff between the local namespaces +// and the remote namespaces to determine which namespaces need to be deleted or +// updated. +func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*structs.Namespace) (delete []string, update []string) { + // Construct a set of the local and remote namespaces + local := make(map[string][]byte) + remote := make(map[string]struct{}) + + // Add all the local namespaces + iter, err := state.Namespaces(nil) + if err != nil { + panic("failed to iterate local namespaces") + } + for { + raw := iter.Next() + if raw == nil { + break + } + namespace := raw.(*structs.Namespace) + local[namespace.Name] = namespace.Hash + } + + // Iterate over the remote namespaces + for _, rns := range remoteList { + remote[rns.Name] = struct{}{} + + // Check if the namespace is missing locally + if localHash, ok := local[rns.Name]; !ok { + update = append(update, rns.Name) + + // Check if the namespace is newer remotely and there is a hash + // mis-match. + } else if rns.ModifyIndex > minIndex && !bytes.Equal(localHash, rns.Hash) { + update = append(update, rns.Name) + } + } + + // Check if namespaces should be deleted + for lns := range local { + if _, ok := remote[lns]; !ok { + delete = append(delete, lns) + } + } + return +} + // restoreEvals is used to restore pending evaluations into the eval broker and // blocked evaluations into the blocked eval tracker. The broker and blocked // eval tracker is maintained only by the leader, so it must be restored anytime diff --git a/nomad/leader_test.go b/nomad/leader_test.go index ba784684d084..997854173982 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -3,6 +3,7 @@ package nomad import ( "errors" "fmt" + "sort" "strconv" "testing" "time" @@ -1457,6 +1458,86 @@ func TestServer_ReconcileMember(t *testing.T) { } } +func TestLeader_ReplicateNamespaces(t *testing.T) { + t.Parallel() + assert := assert.New(t) + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.Region = "region1" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + }) + defer cleanupS1() + s2, _, cleanupS2 := TestACLServer(t, func(c *Config) { + c.Region = "region2" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + c.ReplicationBackoff = 20 * time.Millisecond + c.ReplicationToken = root.SecretID + }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // Write a namespace to the authoritative region + ns1 := mock.Namespace() + assert.Nil(s1.State().UpsertNamespaces(100, []*structs.Namespace{ns1})) + + // Wait for the namespace to replicate + testutil.WaitForResult(func() (bool, error) { + state := s2.State() + out, err := state.NamespaceByName(nil, ns1.Name) + return out != nil, err + }, func(err error) { + t.Fatalf("should replicate namespace") + }) + + // Delete the namespace at the authoritative region + assert.Nil(s1.State().DeleteNamespaces(200, []string{ns1.Name})) + + // Wait for the namespace deletion to replicate + testutil.WaitForResult(func() (bool, error) { + state := s2.State() + out, err := state.NamespaceByName(nil, ns1.Name) + return out == nil, err + }, func(err error) { + t.Fatalf("should replicate namespace deletion") + }) +} + +func TestLeader_DiffNamespaces(t *testing.T) { + t.Parallel() + + state := state.TestStateStore(t) + + // Populate the local state + ns1 := mock.Namespace() + ns2 := mock.Namespace() + ns3 := mock.Namespace() + assert.Nil(t, state.UpsertNamespaces(100, []*structs.Namespace{ns1, ns2, ns3})) + + // Simulate a remote list + rns2 := ns2.Copy() + rns2.ModifyIndex = 50 // Ignored, same index + rns3 := ns3.Copy() + rns3.ModifyIndex = 100 // Updated, higher index + rns3.Hash = []byte{0, 1, 2, 3} + ns4 := mock.Namespace() + remoteList := []*structs.Namespace{ + rns2, + rns3, + ns4, + } + delete, update := diffNamespaces(state, 50, remoteList) + sort.Strings(delete) + + // ns1 does not exist on the remote side, should delete + assert.Equal(t, []string{structs.DefaultNamespace, ns1.Name}, delete) + + // ns2 is un-modified - ignore. ns3 modified, ns4 new. + assert.Equal(t, []string{ns3.Name, ns4.Name}, update) +} + // waitForStableLeadership waits until a leader is elected and all servers // get promoted as voting members, returns the leader func waitForStableLeadership(t *testing.T, servers []*Server) *Server { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index a2bda9e57255..927abef35266 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1505,3 +1505,14 @@ func AllocNetworkStatus() *structs.AllocNetworkStatus { }, } } + +func Namespace() *structs.Namespace { + ns := &structs.Namespace{ + Name: fmt.Sprintf("team-%s", uuid.Generate()), + Description: "test namespace", + CreateIndex: 100, + ModifyIndex: 200, + } + ns.SetHash() + return ns +} diff --git a/nomad/namespace_endpoint.go b/nomad/namespace_endpoint.go new file mode 100644 index 000000000000..7701801a15ac --- /dev/null +++ b/nomad/namespace_endpoint.go @@ -0,0 +1,371 @@ +package nomad + +import ( + "fmt" + "time" + + metrics "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Namespace endpoint is used for manipulating namespaces +type Namespace struct { + srv *Server +} + +// UpsertNamespaces is used to upsert a set of namespaces +func (n *Namespace) UpsertNamespaces(args *structs.NamespaceUpsertRequest, + reply *structs.GenericResponse) error { + args.Region = n.srv.config.AuthoritativeRegion + if done, err := n.srv.forward("Namespace.UpsertNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "upsert_namespaces"}, time.Now()) + + // Check management permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Validate there is at least one namespace + if len(args.Namespaces) == 0 { + return fmt.Errorf("must specify at least one namespace") + } + + // Validate the namespaces and set the hash + for _, ns := range args.Namespaces { + if err := ns.Validate(); err != nil { + return fmt.Errorf("Invalid namespace %q: %v", ns.Name, err) + } + + ns.SetHash() + } + + // Update via Raft + out, index, err := n.srv.raftApply(structs.NamespaceUpsertRequestType, args) + if err != nil { + return err + } + + // Check if there was an error when applying. + if err, ok := out.(error); ok && err != nil { + return err + } + + // Update the index + reply.Index = index + return nil +} + +// DeleteNamespaces is used to delete a namespace +func (n *Namespace) DeleteNamespaces(args *structs.NamespaceDeleteRequest, reply *structs.GenericResponse) error { + args.Region = n.srv.config.AuthoritativeRegion + if done, err := n.srv.forward("Namespace.DeleteNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "delete_namespaces"}, time.Now()) + + // Check management permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Validate at least one namespace + if len(args.Namespaces) == 0 { + return fmt.Errorf("must specify at least one namespace to delete") + } + + for _, ns := range args.Namespaces { + if ns == structs.DefaultNamespace { + return fmt.Errorf("can not delete default namespace") + } + } + + // Check that the deleting namespaces do not have non-terminal jobs in both + // this region and all federated regions + var mErr multierror.Error + for _, ns := range args.Namespaces { + nonTerminal, err := n.nonTerminalNamespaces(args.AuthToken, ns) + if err != nil { + multierror.Append(&mErr, err) + } else if len(nonTerminal) != 0 { + multierror.Append(&mErr, fmt.Errorf("namespace %q has non-terminal jobs in regions: %v", ns, nonTerminal)) + } + } + + if err := mErr.ErrorOrNil(); err != nil { + return err + } + + // Update via Raft + out, index, err := n.srv.raftApply(structs.NamespaceDeleteRequestType, args) + if err != nil { + return err + } + + // Check if there was an error when applying. + if err, ok := out.(error); ok && err != nil { + return err + } + + // Update the index + reply.Index = index + return nil +} + +// nonTerminalNamespaces returns whether the set of regions in which the +// namespaces contains non-terminal jobs, checking all federated regions +// including this one. +func (n *Namespace) nonTerminalNamespaces(authToken, namespace string) ([]string, error) { + regions := n.srv.Regions() + thisRegion := n.srv.Region() + terminal := make([]string, 0, len(regions)) + + // Check if this region is terminal + localTerminal, err := n.namespaceTerminalLocally(namespace) + if err != nil { + return nil, err + } + if !localTerminal { + terminal = append(terminal, thisRegion) + } + + for _, region := range regions { + if region == thisRegion { + continue + } + + remoteTerminal, err := n.namespaceTerminalInRegion(authToken, namespace, region) + if err != nil { + return nil, err + } + if !remoteTerminal { + terminal = append(terminal, region) + } + } + + return terminal, nil +} + +// namespaceTerminalLocally returns if the namespace contains only terminal jobs +// in the local region . +func (n *Namespace) namespaceTerminalLocally(namespace string) (bool, error) { + snap, err := n.srv.fsm.State().Snapshot() + if err != nil { + return false, err + } + + iter, err := snap.JobsByNamespace(nil, namespace) + if err != nil { + return false, err + } + + for { + raw := iter.Next() + if raw == nil { + break + } + + job := raw.(*structs.Job) + if job.Status != structs.JobStatusDead { + return false, nil + } + } + + return true, nil +} + +// namespaceTerminalInRegion returns if the namespace contains only terminal +// jobs in the given region . +func (n *Namespace) namespaceTerminalInRegion(authToken, namespace, region string) (bool, error) { + req := &structs.JobListRequest{ + QueryOptions: structs.QueryOptions{ + Region: region, + Namespace: namespace, + AllowStale: false, + AuthToken: authToken, + }, + } + + var resp structs.JobListResponse + done, err := n.srv.forward("Job.List", req, req, &resp) + if !done { + return false, fmt.Errorf("unexpectedly did not forward Job.List to region %q", region) + } else if err != nil { + return false, err + } + + for _, job := range resp.Jobs { + if job.Status != structs.JobStatusDead { + return false, nil + } + } + + return true, nil +} + +// ListNamespaces is used to list the namespaces +func (n *Namespace) ListNamespaces(args *structs.NamespaceListRequest, reply *structs.NamespaceListResponse) error { + if done, err := n.srv.forward("Namespace.ListNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "list_namespace"}, time.Now()) + + // Resolve token to acl to filter namespace list + aclObj, err := n.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, s *state.StateStore) error { + // Iterate over all the namespaces + var err error + var iter memdb.ResultIterator + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = s.NamespacesByNamePrefix(ws, prefix) + } else { + iter, err = s.Namespaces(ws) + } + if err != nil { + return err + } + + reply.Namespaces = nil + for { + raw := iter.Next() + if raw == nil { + break + } + ns := raw.(*structs.Namespace) + + // Only return namespaces allowed by acl + if aclObj == nil || aclObj.AllowNamespace(ns.Name) { + reply.Namespaces = append(reply.Namespaces, ns) + } + } + + // Use the last index that affected the namespace table + index, err := s.Index(state.TableNamespaces) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index + return nil + }} + return n.srv.blockingRPC(&opts) +} + +// GetNamespace is used to get a specific namespace +func (n *Namespace) GetNamespace(args *structs.NamespaceSpecificRequest, reply *structs.SingleNamespaceResponse) error { + if done, err := n.srv.forward("Namespace.GetNamespace", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "get_namespace"}, time.Now()) + + // Check capabilities for the given namespace permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNamespace(args.Name) { + return structs.ErrPermissionDenied + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, s *state.StateStore) error { + // Look for the namespace + out, err := s.NamespaceByName(ws, args.Name) + if err != nil { + return err + } + + // Setup the output + reply.Namespace = out + if out != nil { + reply.Index = out.ModifyIndex + } else { + // Use the last index that affected the namespace table + index, err := s.Index(state.TableNamespaces) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index + } + return nil + }} + return n.srv.blockingRPC(&opts) +} + +// GetNamespaces is used to get a set of namespaces +func (n *Namespace) GetNamespaces(args *structs.NamespaceSetRequest, reply *structs.NamespaceSetResponse) error { + if done, err := n.srv.forward("Namespace.GetNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "get_namespaces"}, time.Now()) + + // Check management permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, s *state.StateStore) error { + // Setup the output + reply.Namespaces = make(map[string]*structs.Namespace, len(args.Namespaces)) + + // Look for the namespace + for _, namespace := range args.Namespaces { + out, err := s.NamespaceByName(ws, namespace) + if err != nil { + return err + } + if out != nil { + reply.Namespaces[namespace] = out + } + } + + // Use the last index that affected the policy table + index, err := s.Index(state.TableNamespaces) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index + return nil + }} + return n.srv.blockingRPC(&opts) +} diff --git a/nomad/namespace_endpoint_test.go b/nomad/namespace_endpoint_test.go new file mode 100644 index 000000000000..eec1c50bd003 --- /dev/null +++ b/nomad/namespace_endpoint_test.go @@ -0,0 +1,772 @@ +package nomad + +import ( + "fmt" + "testing" + "time" + + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" +) + +func TestNamespaceEndpoint_GetNamespace(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns}) + + // Lookup the namespace + get := &structs.NamespaceSpecificRequest{ + Name: ns.Name, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.SingleNamespaceResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Equal(ns, resp.Namespace) + + // Lookup non-existing namespace + get.Name = uuid.Generate() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Nil(resp.Namespace) +} + +func TestNamespaceEndpoint_GetNamespace_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create the policy and tokens + validToken := mock.CreatePolicyAndToken(t, state, 1002, "test-valid", + mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob})) + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(ns2.Name, "", []string{acl.NamespaceCapabilityReadJob})) + + get := &structs.NamespaceSpecificRequest{ + Name: ns1.Name, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Lookup the namespace without a token and expect failure + { + var resp structs.SingleNamespaceResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with an invalid token + get.AuthToken = invalidToken.SecretID + { + var resp structs.SingleNamespaceResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a valid token + get.AuthToken = validToken.SecretID + { + var resp structs.SingleNamespaceResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Equal(ns1, resp.Namespace) + } + + // Try with a root token + get.AuthToken = root.SecretID + { + var resp structs.SingleNamespaceResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Equal(ns1, resp.Namespace) + } +} + +func TestNamespaceEndpoint_GetNamespace_Blocking(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the namespaces + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + // First create an namespace + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(100, []*structs.Namespace{ns1})) + }) + + // Upsert the namespace we are watching later + time.AfterFunc(200*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(200, []*structs.Namespace{ns2})) + }) + + // Lookup the namespace + req := &structs.NamespaceSpecificRequest{ + Name: ns2.Name, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + var resp structs.SingleNamespaceResponse + start := time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", req, &resp)) + assert.EqualValues(200, resp.Index) + assert.NotNil(resp.Namespace) + assert.Equal(ns2.Name, resp.Namespace.Name) + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + + // Namespace delete triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.DeleteNamespaces(300, []string{ns2.Name})) + }) + + req.QueryOptions.MinQueryIndex = 250 + var resp2 structs.SingleNamespaceResponse + start = time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", req, &resp2)) + assert.EqualValues(300, resp2.Index) + assert.Nil(resp2.Namespace) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } +} + +func TestNamespaceEndpoint_GetNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Lookup the namespace + get := &structs.NamespaceSetRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.NamespaceSetResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 2) + assert.Contains(resp.Namespaces, ns1.Name) + assert.Contains(resp.Namespaces, ns2.Name) +} + +func TestNamespaceEndpoint_GetNamespaces_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + state.UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create the policy and tokens + validToken := mock.CreatePolicyAndToken(t, state, 1002, "test-valid", + mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob})) + + // Lookup the namespace + get := &structs.NamespaceSetRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Lookup the namespaces without a token and expect a failure + { + var resp structs.NamespaceSetResponse + assert.NotNil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + } + + // Try with an non-management token + get.AuthToken = validToken.SecretID + { + var resp structs.NamespaceSetResponse + assert.NotNil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + } + + // Try with a root token + get.AuthToken = root.SecretID + { + var resp structs.NamespaceSetResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 2) + assert.Contains(resp.Namespaces, ns1.Name) + assert.Contains(resp.Namespaces, ns2.Name) + } +} + +func TestNamespaceEndpoint_GetNamespaces_Blocking(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the namespaces + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + // First create an namespace + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(100, []*structs.Namespace{ns1})) + }) + + // Upsert the namespace we are watching later + time.AfterFunc(200*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(200, []*structs.Namespace{ns2})) + }) + + // Lookup the namespace + req := &structs.NamespaceSetRequest{ + Namespaces: []string{ns2.Name}, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + var resp structs.NamespaceSetResponse + start := time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", req, &resp)) + assert.EqualValues(200, resp.Index) + assert.Len(resp.Namespaces, 1) + assert.Contains(resp.Namespaces, ns2.Name) + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + + // Namespace delete triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.DeleteNamespaces(300, []string{ns2.Name})) + }) + + req.QueryOptions.MinQueryIndex = 250 + var resp2 structs.NamespaceSetResponse + start = time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", req, &resp2)) + assert.EqualValues(300, resp2.Index) + assert.Empty(resp2.Namespaces) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } +} + +func TestNamespaceEndpoint_List(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + ns1.Name = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9" + ns2.Name = "aaaabbbb-3350-4b4b-d185-0e1992ed43e9" + assert.Nil(s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2})) + + // Lookup the namespaces + get := &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 3) + + // Lookup the namespaces by prefix + get = &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Prefix: "aaaabb", + }, + } + var resp2 structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp2)) + assert.EqualValues(1000, resp2.Index) + assert.Len(resp2.Namespaces, 1) +} + +func TestNamespaceEndpoint_List_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + + ns1.Name = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9" + ns2.Name = "bbbbbbbb-3350-4b4b-d185-0e1992ed43e9" + assert.Nil(s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2})) + + validDefToken := mock.CreatePolicyAndToken(t, state, 1001, "test-def-valid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS})) + validMultiToken := mock.CreatePolicyAndToken(t, state, 1002, "test-multi-valid", fmt.Sprintf("%s\n%s", + mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob}), + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))) + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy("invalid-namespace", "", []string{acl.NamespaceCapabilityReadJob})) + + get := &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Lookup the namespaces without a token and expect a failure + { + var resp structs.NamespaceListResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp) + assert.Nil(err) + assert.Len(resp.Namespaces, 0) + } + + // Try with an invalid token + get.AuthToken = invalidToken.SecretID + { + var resp structs.NamespaceListResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp) + assert.Nil(err) + assert.Len(resp.Namespaces, 0) + } + + // Try with a valid token for one + get.AuthToken = validDefToken.SecretID + { + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 1) + } + + // Try with a valid token for two + get.AuthToken = validMultiToken.SecretID + { + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 2) + } + + // Try with a root token + get.AuthToken = root.SecretID + { + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 3) + } +} + +func TestNamespaceEndpoint_List_Blocking(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the namespace + ns := mock.Namespace() + + // Upsert namespace triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(200, []*structs.Namespace{ns})) + }) + + req := &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + start := time.Now() + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", req, &resp)) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + assert.EqualValues(200, resp.Index) + assert.Len(resp.Namespaces, 2) + + // Namespace deletion triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.DeleteNamespaces(300, []string{ns.Name})) + }) + + req.MinQueryIndex = 200 + start = time.Now() + var resp2 structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", req, &resp2)) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } + assert.EqualValues(300, resp2.Index) + assert.Len(resp2.Namespaces, 1) +} + +func TestNamespaceEndpoint_DeleteNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Lookup the namespaces + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) +} + +func TestNamespaceEndpoint_DeleteNamespaces_NonTerminal_Local(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create a job in one + j := mock.Job() + j.Namespace = ns1.Name + assert.Nil(s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1001, j)) + + // Lookup the namespaces + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + if assert.NotNil(err) { + assert.Contains(err.Error(), "has non-terminal jobs") + } +} + +func TestNamespaceEndpoint_DeleteNamespaces_NonTerminal_Federated_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.Region = "region1" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + }) + defer cleanupS1() + s2, _, cleanupS2 := TestACLServer(t, func(c *Config) { + c.Region = "region2" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + c.ReplicationBackoff = 20 * time.Millisecond + c.ReplicationToken = root.SecretID + }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + codec := rpcClient(t, s1) + + // Create the register request + ns1 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1}) + + testutil.WaitForResult(func() (bool, error) { + state := s2.State() + out, err := state.NamespaceByName(nil, ns1.Name) + return out != nil, err + }, func(err error) { + t.Fatalf("should replicate namespace") + }) + + // Create a job in the namespace on the non-authority + j := mock.Job() + j.Namespace = ns1.Name + assert.Nil(s2.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1001, j)) + + // Delete the namespaces without the correct permissions + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name}, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + if assert.NotNil(err) { + assert.EqualError(err, structs.ErrPermissionDenied.Error()) + } + + // Try with a auth token + req.AuthToken = root.SecretID + var resp2 structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp2) + if assert.NotNil(err) { + assert.Contains(err.Error(), "has non-terminal jobs") + } +} + +func TestNamespaceEndpoint_DeleteNamespaces_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create the policy and tokens + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Delete namespaces without a token and expect failure + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not delete the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) + } + + // Try with an invalid token + req.AuthToken = invalidToken.SecretID + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not delete the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) + } + + // Try with a root token + req.AuthToken = root.SecretID + { + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) + + // Check we deleted the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.Nil(out) + } +} + +func TestNamespaceEndpoint_DeleteNamespaces_Default(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Delete the default namespace + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{structs.DefaultNamespace}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.NotNil(msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp)) +} + +func TestNamespaceEndpoint_UpsertNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + // Lookup the namespaces + req := &structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) + + // Check we created the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) +} + +func TestNamespaceEndpoint_UpsertNamespaces_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + + // Create the policy and tokens + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + // Create the register request + req := &structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Upsert the namespace without a token and expect failure + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not create the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.Nil(out) + } + + // Try with an invalid token + req.AuthToken = invalidToken.SecretID + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not create the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.Nil(out) + } + + // Try with a root token + req.AuthToken = root.SecretID + { + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) + + // Check we created the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) + } +} diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 1ef24e401478..b1bc89c356c5 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1646,6 +1646,102 @@ func TestClientEndpoint_GetAllocs_ACL_Basic(t *testing.T) { } } +func TestClientEndpoint_GetAllocs_ACL_Namespaces(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + assert := assert.New(t) + + // Create the namespaces + ns1 := mock.Namespace() + ns2 := mock.Namespace() + ns1.Name = "altnamespace" + ns2.Name = "should-only-be-displayed-for-root-ns" + + // Create the allocs + allocDefaultNS := mock.Alloc() + allocAltNS := mock.Alloc() + allocAltNS.Namespace = ns1.Name + allocOtherNS := mock.Alloc() + allocOtherNS.Namespace = ns2.Name + + node := mock.Node() + allocDefaultNS.NodeID = node.ID + allocAltNS.NodeID = node.ID + allocOtherNS.NodeID = node.ID + state := s1.fsm.State() + assert.Nil(state.UpsertNamespaces(1, []*structs.Namespace{ns1, ns2}), "UpsertNamespaces") + assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 2, node), "UpsertNode") + assert.Nil(state.UpsertJobSummary(3, mock.JobSummary(allocDefaultNS.JobID)), "UpsertJobSummary") + assert.Nil(state.UpsertJobSummary(4, mock.JobSummary(allocAltNS.JobID)), "UpsertJobSummary") + assert.Nil(state.UpsertJobSummary(5, mock.JobSummary(allocOtherNS.JobID)), "UpsertJobSummary") + allocs := []*structs.Allocation{allocDefaultNS, allocAltNS, allocOtherNS} + assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 6, allocs), "UpsertAllocs") + + // Create the namespace policy and tokens + validDefaultToken := mock.CreatePolicyAndToken(t, state, 1001, "test-default-valid", mock.NodePolicy(acl.PolicyRead)+ + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + validNoNSToken := mock.CreatePolicyAndToken(t, state, 1003, "test-alt-valid", mock.NodePolicy(acl.PolicyRead)) + invalidToken := mock.CreatePolicyAndToken(t, state, 1004, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + // Lookup the node without a token and expect failure + req := &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + { + var resp structs.NodeAllocsResponse + err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp) + assert.NotNil(err, "RPC") + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a valid token for the default namespace + req.AuthToken = validDefaultToken.SecretID + { + var resp structs.NodeAllocsResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") + assert.Len(resp.Allocs, 1) + assert.Equal(allocDefaultNS.ID, resp.Allocs[0].ID) + } + + // Try with a valid token for a namespace with no allocs on this node + req.AuthToken = validNoNSToken.SecretID + { + var resp structs.NodeAllocsResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") + assert.Len(resp.Allocs, 0) + } + + // Try with a invalid token + req.AuthToken = invalidToken.SecretID + { + var resp structs.NodeAllocsResponse + err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp) + assert.NotNil(err, "RPC") + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a root token + req.AuthToken = root.SecretID + { + var resp structs.NodeAllocsResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") + assert.Len(resp.Allocs, 3) + for _, alloc := range resp.Allocs { + switch alloc.ID { + case allocDefaultNS.ID, allocAltNS.ID, allocOtherNS.ID: + // expected + default: + t.Errorf("unexpected alloc %q for namespace %q", alloc.ID, alloc.Namespace) + } + } + } +} + func TestClientEndpoint_GetClientAllocs(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/search_endpoint.go b/nomad/search_endpoint.go index 3e5eea504aba..478c73b84553 100644 --- a/nomad/search_endpoint.go +++ b/nomad/search_endpoint.go @@ -31,6 +31,7 @@ var ( structs.Deployments, structs.Plugins, structs.Volumes, + structs.Namespaces, } ) @@ -67,6 +68,8 @@ func (s *Search) getMatches(iter memdb.ResultIterator, prefix string) ([]string, id = t.ID case *structs.CSIVolume: id = t.ID + case *structs.Namespace: + id = t.Name default: matchID, ok := getEnterpriseMatch(raw) if !ok { @@ -105,11 +108,28 @@ func getResourceIter(context structs.Context, aclObj *acl.ACL, namespace, prefix return state.CSIPluginsByIDPrefix(ws, prefix) case structs.Volumes: return state.CSIVolumesByIDPrefix(ws, namespace, prefix) + case structs.Namespaces: + iter, err := state.NamespacesByNamePrefix(ws, prefix) + if err != nil { + return nil, err + } + if aclObj == nil { + return iter, nil + } + return memdb.NewFilterIterator(iter, namespaceFilter(aclObj)), nil default: return getEnterpriseResourceIter(context, aclObj, namespace, prefix, ws, state) } } +// namespaceFilter wraps a namespace iterator with a filter for removing +// namespaces the ACL can't access. +func namespaceFilter(aclObj *acl.ACL) memdb.FilterFunc { + return func(v interface{}) bool { + return !aclObj.AllowNamespace(v.(*structs.Namespace).Name) + } +} + // If the length of a prefix is odd, return a subset to the last even character // This only applies to UUIDs, jobs are excluded func roundUUIDDownIfOdd(prefix string, context structs.Context) string { diff --git a/nomad/search_endpoint_oss.go b/nomad/search_endpoint_oss.go index b4d80c63433e..e3f0fc0abcb8 100644 --- a/nomad/search_endpoint_oss.go +++ b/nomad/search_endpoint_oss.go @@ -43,6 +43,7 @@ func anySearchPerms(aclObj *acl.ACL, namespace string, context structs.Context) } nodeRead := aclObj.AllowNodeRead() + allowNS := aclObj.AllowNamespace(namespace) jobRead := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume, acl.NamespaceCapabilityCSIReadVolume, @@ -50,7 +51,7 @@ func anySearchPerms(aclObj *acl.ACL, namespace string, context structs.Context) acl.NamespaceCapabilityReadJob) volRead := allowVolume(aclObj, namespace) - if !nodeRead && !jobRead && !volRead { + if !nodeRead && !jobRead && !volRead && !allowNS { return false } @@ -60,6 +61,10 @@ func anySearchPerms(aclObj *acl.ACL, namespace string, context structs.Context) if !nodeRead && context == structs.Nodes { return false } + if !allowNS && context == structs.Namespaces { + return false + } + if !jobRead { switch context { case structs.Allocs, structs.Deployments, structs.Evals, structs.Jobs: @@ -106,6 +111,10 @@ func searchContexts(aclObj *acl.ACL, namespace string, context structs.Context) if jobRead { available = append(available, c) } + case structs.Namespaces: + if aclObj.AllowNamespace(namespace) { + available = append(available, c) + } case structs.Nodes: if aclObj.AllowNodeRead() { available = append(available, c) diff --git a/nomad/search_endpoint_test.go b/nomad/search_endpoint_test.go index 3d705179ee95..f9a0cdbd1f9c 100644 --- a/nomad/search_endpoint_test.go +++ b/nomad/search_endpoint_test.go @@ -823,3 +823,169 @@ func TestSearch_PrefixSearch_CSIVolume(t *testing.T) { assert.Equal(id, resp.Matches[structs.Volumes][0]) assert.Equal(resp.Truncations[structs.Volumes], false) } + +func TestSearch_PrefixSearch_Namespace(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s, cleanup := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + + defer cleanup() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + ns := mock.Namespace() + assert.Nil(s.fsm.State().UpsertNamespaces(2000, []*structs.Namespace{ns})) + + prefix := ns.Name[:len(ns.Name)-2] + + req := &structs.SearchRequest{ + Prefix: prefix, + Context: structs.Namespaces, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + + var resp structs.SearchResponse + if err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + assert.Equal(1, len(resp.Matches[structs.Namespaces])) + assert.Equal(ns.Name, resp.Matches[structs.Namespaces][0]) + assert.Equal(resp.Truncations[structs.Namespaces], false) + + assert.Equal(uint64(2000), resp.Index) +} + +func TestSearch_PrefixSearch_Namespace_ACL(t *testing.T) { + t.Parallel() + assert := assert.New(t) + s, root, cleanup := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanup() + + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + state := s.fsm.State() + + ns := mock.Namespace() + assert.Nil(state.UpsertNamespaces(500, []*structs.Namespace{ns})) + + job1 := mock.Job() + assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 502, job1)) + + job2 := mock.Job() + job2.Namespace = ns.Name + assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 504, job2)) + + assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1001, mock.Node())) + + req := &structs.SearchRequest{ + Prefix: "", + Context: structs.Jobs, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: job1.Namespace, + }, + } + + // Try without a token and expect failure + { + var resp structs.SearchResponse + err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with an invalid token and expect failure + { + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs})) + req.AuthToken = invalidToken.SecretID + var resp structs.SearchResponse + err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a node:read token and expect failure due to Namespaces being the context + { + validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-invalid2", mock.NodePolicy(acl.PolicyRead)) + req.Context = structs.Namespaces + req.AuthToken = validToken.SecretID + var resp structs.SearchResponse + err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a node:read token and expect success due to All context + { + validToken := mock.CreatePolicyAndToken(t, state, 1007, "test-valid", mock.NodePolicy(acl.PolicyRead)) + req.Context = structs.All + req.AuthToken = validToken.SecretID + var resp structs.SearchResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp)) + assert.Equal(uint64(1001), resp.Index) + assert.Len(resp.Matches[structs.Nodes], 1) + + // Jobs filtered out since token only has access to node:read + assert.Len(resp.Matches[structs.Jobs], 0) + } + + // Try with a valid token for non-default namespace:read-job + { + validToken := mock.CreatePolicyAndToken(t, state, 1009, "test-valid2", + mock.NamespacePolicy(job2.Namespace, "", []string{acl.NamespaceCapabilityReadJob})) + req.Context = structs.All + req.AuthToken = validToken.SecretID + req.Namespace = job2.Namespace + var resp structs.SearchResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp)) + assert.Len(resp.Matches[structs.Jobs], 1) + assert.Equal(job2.ID, resp.Matches[structs.Jobs][0]) + assert.Len(resp.Matches[structs.Namespaces], 1) + + // Index of job - not node - because node context is filtered out + assert.Equal(uint64(504), resp.Index) + + // Nodes filtered out since token only has access to namespace:read-job + assert.Len(resp.Matches[structs.Nodes], 0) + } + + // Try with a valid token for node:read and default namespace:read-job + { + validToken := mock.CreatePolicyAndToken(t, state, 1011, "test-valid3", strings.Join([]string{ + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), + mock.NodePolicy(acl.PolicyRead), + }, "\n")) + req.Context = structs.All + req.AuthToken = validToken.SecretID + req.Namespace = structs.DefaultNamespace + var resp structs.SearchResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp)) + assert.Len(resp.Matches[structs.Jobs], 1) + assert.Equal(job1.ID, resp.Matches[structs.Jobs][0]) + assert.Len(resp.Matches[structs.Nodes], 1) + assert.Equal(uint64(1001), resp.Index) + assert.Len(resp.Matches[structs.Namespaces], 1) + } + + // Try with a management token + { + req.Context = structs.All + req.AuthToken = root.SecretID + req.Namespace = structs.DefaultNamespace + var resp structs.SearchResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp)) + assert.Equal(uint64(1001), resp.Index) + assert.Len(resp.Matches[structs.Jobs], 1) + assert.Equal(job1.ID, resp.Matches[structs.Jobs][0]) + assert.Len(resp.Matches[structs.Nodes], 1) + assert.Len(resp.Matches[structs.Namespaces], 2) + } +} diff --git a/nomad/server.go b/nomad/server.go index fb744d5136ee..05735fe4ab39 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -276,6 +276,7 @@ type endpoints struct { Scaling *Scaling Enterprise *EnterpriseEndpoints Event *Event + Namespace *Namespace // Client endpoints ClientStats *ClientStats @@ -1149,6 +1150,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.Status = &Status{srv: s, logger: s.logger.Named("status")} s.staticEndpoints.System = &System{srv: s, logger: s.logger.Named("system")} s.staticEndpoints.Search = &Search{srv: s, logger: s.logger.Named("search")} + s.staticEndpoints.Namespace = &Namespace{srv: s} s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s) // Client endpoints @@ -1166,6 +1168,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.Event = &Event{srv: s} s.staticEndpoints.Event.register() + } // Register the static handlers @@ -1190,6 +1193,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { server.Register(s.staticEndpoints.ClientCSI) server.Register(s.staticEndpoints.FileSystem) server.Register(s.staticEndpoints.Agent) + server.Register(s.staticEndpoints.Namespace) // Create new dynamic endpoints and add them to the RPC server. node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")} diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 3d1308859ae9..00ea753e026a 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -9,6 +9,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + TableNamespaces = "namespaces" +) + var ( schemaFactories SchemaFactories factoriesLock sync.Mutex @@ -52,6 +56,7 @@ func init() { csiPluginTableSchema, scalingPolicyTableSchema, scalingEventTableSchema, + namespaceTableSchema, }...) } @@ -900,3 +905,28 @@ func scalingEventTableSchema() *memdb.TableSchema { }, } } + +// namespaceTableSchema returns the MemDB schema for the namespace table. +func namespaceTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: TableNamespaces, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Name", + }, + }, + "quota": { + Name: "quota", + AllowMissing: true, + Unique: false, + Indexer: &memdb.StringFieldIndex{ + Field: "Quota", + }, + }, + }, + } +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d5b2f1d3ffa9..97a5761d3b11 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -104,8 +104,8 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges) } - // Initialize the state store with required enterprise objects - if err := s.enterpriseInit(); err != nil { + // Initialize the state store with the default namespace. + if err := s.namespaceInit(); err != nil { return nil, fmt.Errorf("enterprise state store initialization failed: %v", err) } @@ -119,6 +119,25 @@ func (s *StateStore) EventBroker() (*stream.EventBroker, error) { return s.db.publisher, nil } +// namespaceInit ensures the default namespace exists. +func (s *StateStore) namespaceInit() error { + // Create the default namespace. This is safe to do every time we create the + // state store. There are two main cases, a brand new cluster in which case + // each server will have the same default namespace object, or a new cluster + // in which case if the default namespace has been modified, it will be + // overridden by the restore code path. + defaultNs := &structs.Namespace{ + Name: structs.DefaultNamespace, + Description: structs.DefaultNamespaceDescription, + } + + if err := s.UpsertNamespaces(1, []*structs.Namespace{defaultNs}); err != nil { + return fmt.Errorf("inserting default namespace failed: %v", err) + } + + return nil +} + // Config returns the state store configuration. func (s *StateStore) Config() *StateStoreConfig { return s.config @@ -5456,6 +5475,206 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s return nil } +// NamespaceByName is used to lookup a namespace by name +func (s *StateStore) NamespaceByName(ws memdb.WatchSet, name string) (*structs.Namespace, error) { + txn := s.db.ReadTxn() + return s.namespaceByNameImpl(ws, txn, name) +} + +// namespaceByNameImpl is used to lookup a namespace by name +func (s *StateStore) namespaceByNameImpl(ws memdb.WatchSet, txn *txn, name string) (*structs.Namespace, error) { + watchCh, existing, err := txn.FirstWatch(TableNamespaces, "id", name) + if err != nil { + return nil, fmt.Errorf("namespace lookup failed: %v", err) + } + ws.Add(watchCh) + + if existing != nil { + return existing.(*structs.Namespace), nil + } + return nil, nil +} + +// namespaceExists returns whether a namespace exists +func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { + if namespace == structs.DefaultNamespace { + return true, nil + } + + existing, err := txn.First(TableNamespaces, "id", namespace) + if err != nil { + return false, fmt.Errorf("namespace lookup failed: %v", err) + } + + return existing != nil, nil +} + +// NamespacesByNamePrefix is used to lookup namespaces by prefix +func (s *StateStore) NamespacesByNamePrefix(ws memdb.WatchSet, namePrefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := txn.Get(TableNamespaces, "id_prefix", namePrefix) + if err != nil { + return nil, fmt.Errorf("namespaces lookup failed: %v", err) + } + ws.Add(iter.WatchCh()) + + return iter, nil +} + +// Namespaces returns an iterator over all the namespaces +func (s *StateStore) Namespaces(ws memdb.WatchSet) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + // Walk the entire namespace table + iter, err := txn.Get(TableNamespaces, "id") + if err != nil { + return nil, err + } + ws.Add(iter.WatchCh()) + return iter, nil +} + +func (s *StateStore) NamespaceNames() ([]string, error) { + it, err := s.Namespaces(nil) + if err != nil { + return nil, err + } + + nses := []string{} + for { + next := it.Next() + if next == nil { + break + } + ns := next.(*structs.Namespace) + nses = append(nses, ns.Name) + } + + return nses, nil +} + +// UpsertNamespace is used to register or update a set of namespaces +func (s *StateStore) UpsertNamespaces(index uint64, namespaces []*structs.Namespace) error { + txn := s.db.WriteTxn(index) + defer txn.Abort() + + for _, ns := range namespaces { + if err := s.upsertNamespaceImpl(index, txn, ns); err != nil { + return err + } + } + + if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + +// upsertNamespaceImpl is used to upsert a namespace +func (s *StateStore) upsertNamespaceImpl(index uint64, txn *txn, namespace *structs.Namespace) error { + // Ensure the namespace hash is non-nil. This should be done outside the state store + // for performance reasons, but we check here for defense in depth. + ns := namespace + if len(ns.Hash) == 0 { + ns.SetHash() + } + + // Check if the namespace already exists + existing, err := txn.First(TableNamespaces, "id", ns.Name) + if err != nil { + return fmt.Errorf("namespace lookup failed: %v", err) + } + + // Setup the indexes correctly and determine which quotas need to be + // reconciled + var oldQuota string + if existing != nil { + exist := existing.(*structs.Namespace) + ns.CreateIndex = exist.CreateIndex + ns.ModifyIndex = index + + // Grab the old quota on the namespace + oldQuota = exist.Quota + } else { + ns.CreateIndex = index + ns.ModifyIndex = index + } + + // Validate that the quota on the new namespace exists + if ns.Quota != "" { + exists, err := s.quotaSpecExists(txn, ns.Quota) + if err != nil { + return fmt.Errorf("looking up namespace quota %q failed: %v", ns.Quota, err) + } else if !exists { + return fmt.Errorf("namespace %q using non-existent quota %q", ns.Name, ns.Quota) + } + } + + // Insert the namespace + if err := txn.Insert(TableNamespaces, ns); err != nil { + return fmt.Errorf("namespace insert failed: %v", err) + } + + // Reconcile changed quotas + return s.quotaReconcile(index, txn, ns.Quota, oldQuota) +} + +// DeleteNamespaces is used to remove a set of namespaces +func (s *StateStore) DeleteNamespaces(index uint64, names []string) error { + txn := s.db.WriteTxn(index) + defer txn.Abort() + + for _, name := range names { + // Lookup the namespace + existing, err := txn.First(TableNamespaces, "id", name) + if err != nil { + return fmt.Errorf("namespace lookup failed: %v", err) + } + if existing == nil { + return fmt.Errorf("namespace not found") + } + + ns := existing.(*structs.Namespace) + if ns.Name == structs.DefaultNamespace { + return fmt.Errorf("default namespace can not be deleted") + } + + // Ensure that the namespace doesn't have any non-terminal jobs + iter, err := s.jobsByNamespaceImpl(nil, name, txn) + if err != nil { + return err + } + + for { + raw := iter.Next() + if raw == nil { + break + } + job := raw.(*structs.Job) + + if job.Status != structs.JobStatusDead { + return fmt.Errorf("namespace %q contains at least one non-terminal job %q. "+ + "All jobs must be terminal in namespace before it can be deleted", name, job.ID) + } + } + + // Delete the namespace + if err := txn.Delete(TableNamespaces, existing); err != nil { + return fmt.Errorf("namespace deletion failed: %v", err) + } + } + + if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error { txn := s.db.WriteTxn(index) defer txn.Abort() @@ -5891,3 +6110,11 @@ func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) } return nil } + +// NamespaceRestore is used to restore a namespace +func (r *StateRestore) NamespaceRestore(ns *structs.Namespace) error { + if err := r.txn.Insert(TableNamespaces, ns); err != nil { + return fmt.Errorf("namespace insert failed: %v", err) + } + return nil +} diff --git a/nomad/state/state_store_oss.go b/nomad/state/state_store_oss.go index 487f8421378e..c161caf815e7 100644 --- a/nomad/state/state_store_oss.go +++ b/nomad/state/state_store_oss.go @@ -6,15 +6,13 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// enterpriseInit is used to initialize the state store with enterprise -// objects. -func (s *StateStore) enterpriseInit() error { - return nil +// quotaSpecExists on returns whether the quota exists +func (s *StateStore) quotaSpecExists(txn *txn, name string) (bool, error) { + return false, nil } -// namespaceExists returns whether a namespace exists -func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { - return namespace == structs.DefaultNamespace, nil +func (s *StateStore) quotaReconcile(index uint64, txn *txn, newQuota, oldQuota string) error { + return nil } // updateEntWithAlloc is used to update Nomad Enterprise objects when an allocation is @@ -22,7 +20,3 @@ func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error { return nil } - -func (s *StateStore) NamespaceNames() ([]string, error) { - return []string{structs.DefaultNamespace}, nil -} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 44f9b1f9c411..9a54bae3cbab 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -56,47 +56,51 @@ type MessageType uint8 // note: new raft message types need to be added to the end of this // list of contents const ( - NodeRegisterRequestType MessageType = iota - NodeDeregisterRequestType - NodeUpdateStatusRequestType - NodeUpdateDrainRequestType - JobRegisterRequestType - JobDeregisterRequestType - EvalUpdateRequestType - EvalDeleteRequestType - AllocUpdateRequestType - AllocClientUpdateRequestType - ReconcileJobSummariesRequestType - VaultAccessorRegisterRequestType - VaultAccessorDeregisterRequestType - ApplyPlanResultsRequestType - DeploymentStatusUpdateRequestType - DeploymentPromoteRequestType - DeploymentAllocHealthRequestType - DeploymentDeleteRequestType - JobStabilityRequestType - ACLPolicyUpsertRequestType - ACLPolicyDeleteRequestType - ACLTokenUpsertRequestType - ACLTokenDeleteRequestType - ACLTokenBootstrapRequestType - AutopilotRequestType - UpsertNodeEventsType - JobBatchDeregisterRequestType - AllocUpdateDesiredTransitionRequestType - NodeUpdateEligibilityRequestType - BatchNodeUpdateDrainRequestType - SchedulerConfigRequestType - NodeBatchDeregisterRequestType - ClusterMetadataRequestType - ServiceIdentityAccessorRegisterRequestType - ServiceIdentityAccessorDeregisterRequestType - CSIVolumeRegisterRequestType - CSIVolumeDeregisterRequestType - CSIVolumeClaimRequestType - ScalingEventRegisterRequestType - CSIVolumeClaimBatchRequestType - CSIPluginDeleteRequestType + NodeRegisterRequestType MessageType = 0 + NodeDeregisterRequestType MessageType = 1 + NodeUpdateStatusRequestType MessageType = 2 + NodeUpdateDrainRequestType MessageType = 3 + JobRegisterRequestType MessageType = 4 + JobDeregisterRequestType MessageType = 5 + EvalUpdateRequestType MessageType = 6 + EvalDeleteRequestType MessageType = 7 + AllocUpdateRequestType MessageType = 8 + AllocClientUpdateRequestType MessageType = 9 + ReconcileJobSummariesRequestType MessageType = 10 + VaultAccessorRegisterRequestType MessageType = 11 + VaultAccessorDeregisterRequestType MessageType = 12 + ApplyPlanResultsRequestType MessageType = 13 + DeploymentStatusUpdateRequestType MessageType = 14 + DeploymentPromoteRequestType MessageType = 15 + DeploymentAllocHealthRequestType MessageType = 16 + DeploymentDeleteRequestType MessageType = 17 + JobStabilityRequestType MessageType = 18 + ACLPolicyUpsertRequestType MessageType = 19 + ACLPolicyDeleteRequestType MessageType = 20 + ACLTokenUpsertRequestType MessageType = 21 + ACLTokenDeleteRequestType MessageType = 22 + ACLTokenBootstrapRequestType MessageType = 23 + AutopilotRequestType MessageType = 24 + UpsertNodeEventsType MessageType = 25 + JobBatchDeregisterRequestType MessageType = 26 + AllocUpdateDesiredTransitionRequestType MessageType = 27 + NodeUpdateEligibilityRequestType MessageType = 28 + BatchNodeUpdateDrainRequestType MessageType = 29 + SchedulerConfigRequestType MessageType = 30 + NodeBatchDeregisterRequestType MessageType = 31 + ClusterMetadataRequestType MessageType = 32 + ServiceIdentityAccessorRegisterRequestType MessageType = 33 + ServiceIdentityAccessorDeregisterRequestType MessageType = 34 + CSIVolumeRegisterRequestType MessageType = 35 + CSIVolumeDeregisterRequestType MessageType = 36 + CSIVolumeClaimRequestType MessageType = 37 + ScalingEventRegisterRequestType MessageType = 38 + CSIVolumeClaimBatchRequestType MessageType = 39 + CSIPluginDeleteRequestType MessageType = 40 + + // Namespace types were moved from enterprise and therefore start at 64 + NamespaceUpsertRequestType MessageType = 64 + NamespaceDeleteRequestType MessageType = 65 ) const ( @@ -148,6 +152,9 @@ const ( // to indicate that endpoints must search in all namespaces AllNamespacesSentinel = "*" + // maxNamespaceDescriptionLength limits a namespace description length + maxNamespaceDescriptionLength = 256 + // JitterFraction is a the limit to the amount of jitter we apply // to a user specified MaxQueryTime. We divide the specified time by // the fraction. So 16 == 6.25% limit of jitter. This jitter is also @@ -173,6 +180,11 @@ const ( DefaultBlockingRPCQueryTime = 300 * time.Second ) +var ( + // validNamespaceName is used to validate a namespace name + validNamespaceName = regexp.MustCompile("^[a-zA-Z0-9-]{1,128}$") +) + // Context defines the scope in which a search for Nomad object operates, and // is also used to query the matching index value for this context type Context string @@ -4698,6 +4710,119 @@ type MultiregionRegion struct { Meta map[string]string } +// Namespace allows logically grouping jobs and their associated objects. +type Namespace struct { + // Name is the name of the namespace + Name string + + // Description is a human readable description of the namespace + Description string + + // Quota is the quota specification that the namespace should account + // against. + Quota string + + // Hash is the hash of the namespace which is used to efficiently replicate + // cross-regions. + Hash []byte + + // Raft Indexes + CreateIndex uint64 + ModifyIndex uint64 +} + +func (n *Namespace) Validate() error { + var mErr multierror.Error + + // Validate the name and description + if !validNamespaceName.MatchString(n.Name) { + err := fmt.Errorf("invalid name %q. Must match regex %s", n.Name, validNamespaceName) + mErr.Errors = append(mErr.Errors, err) + } + if len(n.Description) > maxNamespaceDescriptionLength { + err := fmt.Errorf("description longer than %d", maxNamespaceDescriptionLength) + mErr.Errors = append(mErr.Errors, err) + } + + return mErr.ErrorOrNil() +} + +// SetHash is used to compute and set the hash of the namespace +func (n *Namespace) SetHash() []byte { + // Initialize a 256bit Blake2 hash (32 bytes) + hash, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + + // Write all the user set fields + hash.Write([]byte(n.Name)) + hash.Write([]byte(n.Description)) + hash.Write([]byte(n.Quota)) + + // Finalize the hash + hashVal := hash.Sum(nil) + + // Set and return the hash + n.Hash = hashVal + return hashVal +} + +func (n *Namespace) Copy() *Namespace { + nc := new(Namespace) + *nc = *n + nc.Hash = make([]byte, len(n.Hash)) + copy(nc.Hash, n.Hash) + return nc +} + +// NamespaceListRequest is used to request a list of namespaces +type NamespaceListRequest struct { + QueryOptions +} + +// NamespaceListResponse is used for a list request +type NamespaceListResponse struct { + Namespaces []*Namespace + QueryMeta +} + +// NamespaceSpecificRequest is used to query a specific namespace +type NamespaceSpecificRequest struct { + Name string + QueryOptions +} + +// SingleNamespaceResponse is used to return a single namespace +type SingleNamespaceResponse struct { + Namespace *Namespace + QueryMeta +} + +// NamespaceSetRequest is used to query a set of namespaces +type NamespaceSetRequest struct { + Namespaces []string + QueryOptions +} + +// NamespaceSetResponse is used to return a set of namespaces +type NamespaceSetResponse struct { + Namespaces map[string]*Namespace // Keyed by namespace Name + QueryMeta +} + +// NamespaceDeleteRequest is used to delete a set of namespaces +type NamespaceDeleteRequest struct { + Namespaces []string + WriteRequest +} + +// NamespaceUpsertRequest is used to upsert a set of namespaces +type NamespaceUpsertRequest struct { + Namespaces []*Namespace + WriteRequest +} + const ( // PeriodicSpecCron is used for a cron spec. PeriodicSpecCron = "cron"