Skip to content

Commit

Permalink
api: return sorted results in certain list endpoints
Browse files Browse the repository at this point in the history
These API endpoints now return results in chronological order. They
can return results in reverse chronological order by setting the
query parameter ascending=true.

- Eval.List
- Deployment.List
  • Loading branch information
shoenig committed Feb 14, 2022
1 parent 282eb10 commit aaa0ccb
Show file tree
Hide file tree
Showing 19 changed files with 534 additions and 100 deletions.
3 changes: 3 additions & 0 deletions .changelog/12054.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
api: sort return values of evaluation and deployment list api endpoints by creation index
```
3 changes: 3 additions & 0 deletions command/agent/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func (s *HTTPServer) DeploymentsRequest(resp http.ResponseWriter, req *http.Requ
return nil, nil
}

query := req.URL.Query()
args.OrderAscending = query.Get("ascending") == "true"

var out structs.DeploymentListResponse
if err := s.agent.RPC("Deployment.List", &args, &out); err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions command/agent/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (
query := req.URL.Query()
args.FilterEvalStatus = query.Get("status")
args.FilterJobID = query.Get("job")
args.OrderAscending = query.Get("ascending") == "true"

var out structs.EvalListResponse
if err := s.agent.RPC("Eval.List", &args, &out); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func StateAsMap(state *state.StateStore) map[string][]interface{} {
"Allocs": toArray(state.Allocs(nil)),
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil)),
"Evals": toArray(state.Evals(nil)),
"Deployments": toArray(state.Deployments(nil, false)),
"Evals": toArray(state.Evals(nil, false)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
"JobVersions": toArray(state.JobVersions(nil)),
Expand Down
4 changes: 2 additions & 2 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string)
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
ws := memdb.NewWatchSet()
iter, err := c.snap.Evals(ws)
iter, err := c.snap.Evals(ws, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws)
iter, err := c.snap.Deployments(ws, false)
if err != nil {
return err
}
Expand Down
17 changes: 13 additions & 4 deletions nomad/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,17 +407,26 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
// Capture all the deployments
var err error
var iter memdb.ResultIterator
namespace := args.RequestNamespace()

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.DeploymentsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Deployments(ws)
iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
iter, err = store.DeploymentsByNamespace(ws, args.RequestNamespace())
iter, err = store.Deployments(ws, args.OrderAscending)
}
if err != nil {
return err
}

iter = memdb.NewFilterIterator(iter, func(raw interface{}) bool {
if deployment := raw.(*structs.Deployment); deployment != nil {
return args.ShouldBeFiltered(deployment)
}
return false
})

var deploys []*structs.Deployment
paginator := state.NewPaginator(iter, args.QueryOptions,
func(raw interface{}) {
Expand Down
111 changes: 101 additions & 10 deletions nomad/deployment_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
Expand Down Expand Up @@ -1031,6 +1032,95 @@ func TestDeploymentEndpoint_List(t *testing.T) {
assert.Len(resp.Deployments, 2, "Deployments")
}

func TestDeploymentEndpoint_List_order(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create register requests
uuid1 := uuid.Generate()
dep1 := mock.Deployment()
dep1.ID = uuid1

uuid2 := uuid.Generate()
dep2 := mock.Deployment()
dep2.ID = uuid2

uuid3 := uuid.Generate()
dep3 := mock.Deployment()
dep3.ID = uuid3

err := s1.fsm.State().UpsertDeployment(1000, dep1)
require.NoError(t, err)

err = s1.fsm.State().UpsertDeployment(1001, dep2)
require.NoError(t, err)

err = s1.fsm.State().UpsertDeployment(1002, dep3)
require.NoError(t, err)

// update dep2 again so we can later assert create index order did not change
err = s1.fsm.State().UpsertDeployment(1003, dep2)
require.NoError(t, err)

t.Run("ascending", func(t *testing.T) {
// Lookup the deployments in chronological order (oldest first)
get := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: true,
}

var resp structs.DeploymentListResponse
err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Deployments, 3)

// Assert returned order is by CreateIndex (ascending)
require.Equal(t, uint64(1000), resp.Deployments[0].CreateIndex)
require.Equal(t, uuid1, resp.Deployments[0].ID)

require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex)
require.Equal(t, uuid2, resp.Deployments[1].ID)

require.Equal(t, uint64(1002), resp.Deployments[2].CreateIndex)
require.Equal(t, uuid3, resp.Deployments[2].ID)
})

t.Run("descending", func(t *testing.T) {
// Lookup the deployments in reverse chronological order (newest first)
get := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: false,
}

var resp structs.DeploymentListResponse
err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Deployments, 3)

// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Deployments[0].CreateIndex)
require.Equal(t, uuid3, resp.Deployments[0].ID)

require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex)
require.Equal(t, uuid2, resp.Deployments[1].ID)

require.Equal(t, uint64(1000), resp.Deployments[2].CreateIndex)
require.Equal(t, uuid1, resp.Deployments[2].ID)
})
}

func TestDeploymentEndpoint_List_ACL(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1174,23 +1264,23 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
jobID string
status string
}{
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"},
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"}, // 0
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}, // 1
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, // 2
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}, // 3
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, // 4
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, // 5
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}, // 6
}

state := s1.fsm.State()
index := uint64(1000)

for _, m := range mocks {
index++
for i, m := range mocks {
index := 1000 + uint64(i)
deployment := mock.Deployment()
deployment.Status = structs.DeploymentStatusCancelled
deployment.ID = m.id
deployment.CreateIndex = index
if m.namespace != "" { // defaults to "default"
deployment.Namespace = m.namespace
}
Expand Down Expand Up @@ -1262,6 +1352,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.DeploymentListRequest{
OrderAscending: true, // counting up is easier to think about
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,
Expand Down
2 changes: 1 addition & 1 deletion nomad/deploymentwatcher/deployments_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D
// getDeploysImpl retrieves all deployments from the passed state store.
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {

iter, err := state.Deployments(ws)
iter, err := state.Deployments(ws, false)
if err != nil {
return nil, 0, err
}
Expand Down
16 changes: 9 additions & 7 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,7 @@ func (e *Eval) Reap(args *structs.EvalDeleteRequest,
}

// List is used to get a list of the evaluations in the system
func (e *Eval) List(args *structs.EvalListRequest,
reply *structs.EvalListResponse) error {
func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error {
if done, err := e.srv.forward("Eval.List", args, args, reply); done {
return err
}
Expand All @@ -404,12 +403,15 @@ func (e *Eval) List(args *structs.EvalListRequest,
// Scan all the evaluations
var err error
var iter memdb.ResultIterator
if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Evals(ws)
} else if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix)

namespace := args.RequestNamespace()

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
iter, err = store.EvalsByNamespace(ws, args.RequestNamespace())
iter, err = store.Evals(ws, args.OrderAscending)
}
if err != nil {
return err
Expand Down
Loading

0 comments on commit aaa0ccb

Please sign in to comment.