Skip to content

Commit

Permalink
feat(server): archivedWf add namePrefix search. Fixes #6743 (#6801)
Browse files Browse the repository at this point in the history
Signed-off-by: Tianchu Zhao <evantczhao@gmail.com>
  • Loading branch information
tczhao authored Oct 7, 2021
1 parent 689ad68 commit b548097
Show file tree
Hide file tree
Showing 16 changed files with 237 additions and 69 deletions.
5 changes: 5 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
"description": "The continue option should be set when retrieving more results from the server. Since this value is\nserver defined, clients may only use the continue value from a previous query result with identical\nquery parameters (except for the value of continue) and the server may reject a continue value it\ndoes not recognize. If the specified continue value is no longer valid whether due to expiration\n(generally five to fifteen minutes) or a configuration change on the server, the server will\nrespond with a 410 ResourceExpired error together with a continue token. If the client needs a\nconsistent list, it must restart their list without the continue field. Otherwise, the client may\nsend another list request with the token received with the 410 error, the server will respond with\na list starting from the next key, but from the latest snapshot, which is inconsistent from the\nprevious list results - objects that are created, modified, or deleted after the first list request\nwill be included in the response, as long as their keys are after the \"next key\".\n\nThis field is not supported when watch is true. Clients may start a watch from the last\nresourceVersion value returned by the server and not miss any modifications.",
"name": "listOptions.continue",
"in": "query"
},
{
"type": "string",
"name": "namePrefix",
"in": "query"
}
],
"responses": {
Expand Down
2 changes: 2 additions & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ func (m migrate) Exec(ctx context.Context) (err error) {
ansiSQLChange(`create index ` + m.tableName + `_i1 on ` + m.tableName + ` (clustername,namespace,updatedat)`),
// index to find records that need deleting, this omits namespaces as this might be null
ansiSQLChange(`create index argo_archived_workflows_i2 on argo_archived_workflows (clustername,instanceid,finishedat)`),
// add argo_archived_workflows name index for prefix searching performance
ansiSQLChange(`create index argo_archived_workflows_i3 on argo_archived_workflows (clustername,instanceid,name)`),
} {
err := m.applyChange(ctx, changeSchemaVersion, change)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions persist/sqldb/mocks/WorkflowArchive.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion persist/sqldb/null_workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (r *nullWorkflowArchive) ArchiveWorkflow(*wfv1.Workflow) error {
return nil
}

func (r *nullWorkflowArchive) ListWorkflows(string, string, time.Time, time.Time, labels.Requirements, int, int) (wfv1.Workflows, error) {
func (r *nullWorkflowArchive) ListWorkflows(string, string, string, time.Time, time.Time, labels.Requirements, int, int) (wfv1.Workflows, error) {
return wfv1.Workflows{}, nil
}

Expand Down
13 changes: 11 additions & 2 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type archivedWorkflowLabelRecord struct {
type WorkflowArchive interface {
ArchiveWorkflow(wf *wfv1.Workflow) error
// list workflows, with the most recently started workflows at the beginning (i.e. index 0 is the most recent)
ListWorkflows(namespace string, name string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error)
ListWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error)
GetWorkflow(uid string) (*wfv1.Workflow, error)
DeleteWorkflow(uid string) error
DeleteExpiredWorkflows(ttl time.Duration) error
Expand Down Expand Up @@ -136,7 +136,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
})
}

func (r *workflowArchive) ListWorkflows(namespace string, name string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) {
func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) {
var archivedWfs []archivedWorkflowMetadata
clause, err := labelsClause(r.dbType, labelRequirements)
if err != nil {
Expand All @@ -156,6 +156,7 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, minStarte
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(namespace)).
And(nameEqual(name)).
And(namePrefixClause(namePrefix)).
And(startedAtClause(minStartedAt, maxStartedAt)).
And(clause).
OrderBy("-startedat").
Expand Down Expand Up @@ -219,6 +220,14 @@ func nameEqual(name string) db.Cond {
}
}

func namePrefixClause(namePrefix string) db.Cond {
if namePrefix == "" {
return db.Cond{}
} else {
return db.Cond{"name LIKE ": namePrefix + "%"}
}
}

func (r *workflowArchive) GetWorkflow(uid string) (*wfv1.Workflow, error) {
archivedWf := &archivedWorkflowRecord{}
err := r.session.
Expand Down
125 changes: 89 additions & 36 deletions pkg/apiclient/workflowarchive/workflow-archive.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apiclient/workflowarchive/workflow-archive.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package workflowarchive;

message ListArchivedWorkflowsRequest {
k8s.io.apimachinery.pkg.apis.meta.v1.ListOptions listOptions = 1;
string namePrefix = 2;
}
message GetArchivedWorkflowRequest {
string uid = 1;
Expand Down
3 changes: 2 additions & 1 deletion server/workflowarchive/archived_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive) workflowarchivepk

func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *workflowarchivepkg.ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) {
options := req.ListOptions
namePrefix := req.NamePrefix
if options == nil {
options = &metav1.ListOptions{}
}
Expand Down Expand Up @@ -98,7 +99,7 @@ func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req
limitWithMore = limit + 1
}

items, err := w.wfArchive.ListWorkflows(namespace, name, minStartedAt, maxStartedAt, requirements, limitWithMore, offset)
items, err := w.wfArchive.ListWorkflows(namespace, name, namePrefix, minStartedAt, maxStartedAt, requirements, limitWithMore, offset)
if err != nil {
return nil, err
}
Expand Down
20 changes: 16 additions & 4 deletions server/workflowarchive/archived_workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ func Test_archivedWorkflowServer(t *testing.T) {
}, nil
})
// two pages of results for limit 1
repo.On("ListWorkflows", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}, {}}, nil)
repo.On("ListWorkflows", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 1).Return(wfv1.Workflows{{}}, nil)
repo.On("ListWorkflows", "", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}, {}}, nil)
repo.On("ListWorkflows", "", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 1).Return(wfv1.Workflows{{}}, nil)
minStartAt, _ := time.Parse(time.RFC3339, "2020-01-01T00:00:00Z")
maxStartAt, _ := time.Parse(time.RFC3339, "2020-01-02T00:00:00Z")
repo.On("ListWorkflows", "", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil)
repo.On("ListWorkflows", "", "my-name", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil)
repo.On("ListWorkflows", "", "", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil)
repo.On("ListWorkflows", "", "my-name", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil)
repo.On("ListWorkflows", "", "", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil)
repo.On("ListWorkflows", "", "my-name", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil)
repo.On("GetWorkflow", "").Return(nil, nil)
repo.On("GetWorkflow", "my-uid").Return(&wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "my-name"},
Expand Down Expand Up @@ -101,6 +103,16 @@ func Test_archivedWorkflowServer(t *testing.T) {
assert.Len(t, resp.Items, 1)
assert.Empty(t, resp.Continue)
}
resp, err = w.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{ListOptions: &metav1.ListOptions{FieldSelector: "spec.startedAt>2020-01-01T00:00:00Z,spec.startedAt<2020-01-02T00:00:00Z", Limit: 1}, NamePrefix: "my-"})
if assert.NoError(t, err) {
assert.Len(t, resp.Items, 1)
assert.Empty(t, resp.Continue)
}
resp, err = w.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{ListOptions: &metav1.ListOptions{FieldSelector: "metadata.name=my-name,spec.startedAt>2020-01-01T00:00:00Z,spec.startedAt<2020-01-02T00:00:00Z", Limit: 1}, NamePrefix: "my-"})
if assert.NoError(t, err) {
assert.Len(t, resp.Items, 1)
assert.Empty(t, resp.Continue)
}
})
t.Run("GetArchivedWorkflow", func(t *testing.T) {
allowed = false
Expand Down
Loading

0 comments on commit b548097

Please sign in to comment.