Skip to content

Commit

Permalink
api: return sorted results in certain list endpoints
Browse files Browse the repository at this point in the history
These API endpoints now return results in chronological order. They
can return results in reverse chronological order by setting the
query parameter ascending=true.

- Eval.List
- Deployment.List
  • Loading branch information
shoenig committed Feb 15, 2022
1 parent 282eb10 commit b432f37
Show file tree
Hide file tree
Showing 20 changed files with 487 additions and 120 deletions.
3 changes: 3 additions & 0 deletions .changelog/12054.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
api: sort return values of evaluation and deployment list api endpoints by creation index
```
8 changes: 8 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type QueryOptions struct {
// previous response.
NextToken string

// Ascending is used to have results sorted in ascending chronological order.
//
// Currently only supported by evaluations.List and deployments.list endpoints.
Ascending bool

// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
ctx context.Context
Expand Down Expand Up @@ -587,6 +592,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.NextToken != "" {
r.params.Set("next_token", q.NextToken)
}
if q.Ascending {
r.params.Set("ascending", "true")
}
for k, v := range q.Params {
r.params.Set(k, v)
}
Expand Down
32 changes: 15 additions & 17 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,27 +181,25 @@ func TestSetQueryOptions(t *testing.T) {
WaitIndex: 1000,
WaitTime: 100 * time.Second,
AuthToken: "foobar",
Ascending: true,
}
r.setQueryOptions(q)

if r.params.Get("region") != "foo" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("namespace") != "bar" {
t.Fatalf("bad: %v", r.params)
}
if _, ok := r.params["stale"]; !ok {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("index") != "1000" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("wait") != "100000ms" {
t.Fatalf("bad: %v", r.params)
}
if r.token != "foobar" {
t.Fatalf("bad: %v", r.token)
try := func(key, exp string) {
result := r.params.Get(key)
require.Equal(t, exp, result)
}

// Check auth token is set
require.Equal(t, "foobar", r.token)

// Check query parameters are set
try("region", "foo")
try("namespace", "bar")
try("stale", "") // should not be present
try("index", "1000")
try("wait", "100000ms")
try("ascending", "true")
}

func TestQueryOptionsContext(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions command/agent/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func (s *HTTPServer) DeploymentsRequest(resp http.ResponseWriter, req *http.Requ
return nil, nil
}

query := req.URL.Query()
args.OrderAscending = query.Get("ascending") == "true"

var out structs.DeploymentListResponse
if err := s.agent.RPC("Deployment.List", &args, &out); err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions command/agent/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (
query := req.URL.Query()
args.FilterEvalStatus = query.Get("status")
args.FilterJobID = query.Get("job")
args.OrderAscending = query.Get("ascending") == "true"

var out structs.EvalListResponse
if err := s.agent.RPC("Eval.List", &args, &out); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func StateAsMap(state *state.StateStore) map[string][]interface{} {
"Allocs": toArray(state.Allocs(nil)),
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil)),
"Evals": toArray(state.Evals(nil)),
"Deployments": toArray(state.Deployments(nil, false)),
"Evals": toArray(state.Evals(nil, false)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
"JobVersions": toArray(state.JobVersions(nil)),
Expand Down
4 changes: 2 additions & 2 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string)
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
ws := memdb.NewWatchSet()
iter, err := c.snap.Evals(ws)
iter, err := c.snap.Evals(ws, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws)
iter, err := c.snap.Deployments(ws, false)
if err != nil {
return err
}
Expand Down
14 changes: 8 additions & 6 deletions nomad/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"

"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -391,11 +390,13 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
}
defer metrics.MeasureSince([]string{"nomad", "deployment", "list"}, time.Now())

namespace := args.RequestNamespace()

// Check namespace read-job permissions against request namespace since
// results are filtered by request namespace.
if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
} else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}

Expand All @@ -407,12 +408,13 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
// Capture all the deployments
var err error
var iter memdb.ResultIterator

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.DeploymentsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Deployments(ws)
iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
iter, err = store.DeploymentsByNamespace(ws, args.RequestNamespace())
iter, err = store.Deployments(ws, args.OrderAscending)
}
if err != nil {
return err
Expand Down
111 changes: 101 additions & 10 deletions nomad/deployment_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
Expand Down Expand Up @@ -1031,6 +1032,95 @@ func TestDeploymentEndpoint_List(t *testing.T) {
assert.Len(resp.Deployments, 2, "Deployments")
}

func TestDeploymentEndpoint_List_order(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create register requests
uuid1 := uuid.Generate()
dep1 := mock.Deployment()
dep1.ID = uuid1

uuid2 := uuid.Generate()
dep2 := mock.Deployment()
dep2.ID = uuid2

uuid3 := uuid.Generate()
dep3 := mock.Deployment()
dep3.ID = uuid3

err := s1.fsm.State().UpsertDeployment(1000, dep1)
require.NoError(t, err)

err = s1.fsm.State().UpsertDeployment(1001, dep2)
require.NoError(t, err)

err = s1.fsm.State().UpsertDeployment(1002, dep3)
require.NoError(t, err)

// update dep2 again so we can later assert create index order did not change
err = s1.fsm.State().UpsertDeployment(1003, dep2)
require.NoError(t, err)

t.Run("ascending", func(t *testing.T) {
// Lookup the deployments in chronological order (oldest first)
get := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: true,
}

var resp structs.DeploymentListResponse
err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Deployments, 3)

// Assert returned order is by CreateIndex (ascending)
require.Equal(t, uint64(1000), resp.Deployments[0].CreateIndex)
require.Equal(t, uuid1, resp.Deployments[0].ID)

require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex)
require.Equal(t, uuid2, resp.Deployments[1].ID)

require.Equal(t, uint64(1002), resp.Deployments[2].CreateIndex)
require.Equal(t, uuid3, resp.Deployments[2].ID)
})

t.Run("descending", func(t *testing.T) {
// Lookup the deployments in reverse chronological order (newest first)
get := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: false,
}

var resp structs.DeploymentListResponse
err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Deployments, 3)

// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Deployments[0].CreateIndex)
require.Equal(t, uuid3, resp.Deployments[0].ID)

require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex)
require.Equal(t, uuid2, resp.Deployments[1].ID)

require.Equal(t, uint64(1000), resp.Deployments[2].CreateIndex)
require.Equal(t, uuid1, resp.Deployments[2].ID)
})
}

func TestDeploymentEndpoint_List_ACL(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1174,23 +1264,23 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
jobID string
status string
}{
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"},
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"}, // 0
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}, // 1
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, // 2
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}, // 3
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, // 4
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, // 5
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}, // 6
}

state := s1.fsm.State()
index := uint64(1000)

for _, m := range mocks {
index++
for i, m := range mocks {
index := 1000 + uint64(i)
deployment := mock.Deployment()
deployment.Status = structs.DeploymentStatusCancelled
deployment.ID = m.id
deployment.CreateIndex = index
if m.namespace != "" { // defaults to "default"
deployment.Namespace = m.namespace
}
Expand Down Expand Up @@ -1262,6 +1352,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.DeploymentListRequest{
OrderAscending: true, // counting up is easier to think about
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,
Expand Down
2 changes: 1 addition & 1 deletion nomad/deploymentwatcher/deployments_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D
// getDeploysImpl retrieves all deployments from the passed state store.
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {

iter, err := state.Deployments(ws)
iter, err := state.Deployments(ws, false)
if err != nil {
return nil, 0, err
}
Expand Down
18 changes: 10 additions & 8 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,17 +382,18 @@ func (e *Eval) Reap(args *structs.EvalDeleteRequest,
}

// List is used to get a list of the evaluations in the system
func (e *Eval) List(args *structs.EvalListRequest,
reply *structs.EvalListResponse) error {
func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error {
if done, err := e.srv.forward("Eval.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "list"}, time.Now())

namespace := args.RequestNamespace()

// Check for read-job permissions
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
} else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}

Expand All @@ -404,12 +405,13 @@ func (e *Eval) List(args *structs.EvalListRequest,
// Scan all the evaluations
var err error
var iter memdb.ResultIterator
if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Evals(ws)
} else if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix)

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
iter, err = store.EvalsByNamespace(ws, args.RequestNamespace())
iter, err = store.Evals(ws, args.OrderAscending)
}
if err != nil {
return err
Expand Down
Loading

0 comments on commit b432f37

Please sign in to comment.