Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capacity to list jobs by ID + make default #307

Merged
merged 1 commit into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- **Breaking change:** JobList/JobListTx now support querying Jobs by a list of Job Kinds and States. Also allows for filtering by specific timestamp values. Thank you Jos Kraaijeveld (@thatjos)! 🙏🏻 [PR #236](https://github.com/riverqueue/river/pull/236).
- **Breaking change:** There are a number of small breaking changes in the job list API using `JobList`/`JobListTx`:
- Now support querying jobs by a list of Job Kinds and States. Also allows for filtering by specific timestamp values. Thank you Jos Kraaijeveld (@thatjos)! 🙏🏻 [PR #236](https://github.com/riverqueue/river/pull/236).
- Job listing now defaults to ordering by job ID (`JobListOrderByID`) instead of a job timestamp dependent on on requested job state. The previous ordering behavior is still available with `NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc)`. [PR #XXX](https://github.com/riverqueue/river/pull/XXX).
- The function `JobListCursorFromJob` no longer needs a sort order parameter. Instead, sort order is determined based on the job list parameters that the cursor is subsequently used with. [PR #XXX](https://github.com/riverqueue/river/pull/XXX).
- **Breaking change:** Client `Insert` and `InsertTx` functions now return a `JobInsertResult` struct instead of a `JobRow`. This allows the result to include metadata like the new `UniqueSkippedAsDuplicate` property, so callers can tell whether an inserted job was skipped due to unique constraint. [PR #292](https://github.com/riverqueue/river/pull/292).
- **Breaking change:** Client `InsertMany` and `InsertManyTx` now return number of jobs inserted as `int` instead of `int64`. This change was made to make the type in use a little more idiomatic. [PR #293](https://github.com/riverqueue/river/pull/293).
- **Breaking change:** `river.JobState*` type aliases have been removed. All job state constants should be accessed through `rivertype.JobState*` instead. [PR #300](https://github.com/riverqueue/river/pull/300).
Expand Down
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,7 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
}
res := &JobListResult{Jobs: jobs}
if len(jobs) > 0 {
res.LastCursor = JobListCursorFromJob(jobs[len(jobs)-1], params.sortField)
res.LastCursor = jobListCursorFromJobAndParams(jobs[len(jobs)-1], params)
}
return res, nil
}
Expand Down Expand Up @@ -1519,7 +1519,7 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
}
res := &JobListResult{Jobs: jobs}
if len(jobs) > 0 {
res.LastCursor = JobListCursorFromJob(jobs[len(jobs)-1], params.sortField)
res.LastCursor = jobListCursorFromJobAndParams(jobs[len(jobs)-1], params)
}
return res, nil
}
Expand Down
109 changes: 91 additions & 18 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,24 @@ func Test_Client_JobList(t *testing.T) {
require.Equal(t, []int64{job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("SortsAvailableRetryableAndScheduledJobsByScheduledAt", func(t *testing.T) {
t.Run("DefaultsToOrderingByID", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})

res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("OrderByTimeSortsAvailableRetryableAndScheduledJobsByScheduledAt", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand All @@ -1516,7 +1533,7 @@ func Test_Client_JobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(state), ScheduledAt: &now})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(state), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))})

res, err := client.JobList(ctx, NewJobListParams().States(state))
res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(state))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

Expand All @@ -1526,7 +1543,7 @@ func Test_Client_JobList(t *testing.T) {
}
})

t.Run("SortsCancelledCompletedAndDiscardedJobsByFinalizedAt", func(t *testing.T) {
t.Run("OrderByTimeSortsCancelledCompletedAndDiscardedJobsByFinalizedAt", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand All @@ -1542,7 +1559,7 @@ func Test_Client_JobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(state), FinalizedAt: ptrutil.Ptr(now.Add(-10 * time.Second))})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(state), FinalizedAt: ptrutil.Ptr(now.Add(-15 * time.Second))})

res, err := client.JobList(ctx, NewJobListParams().States(state))
res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(state))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

Expand All @@ -1552,7 +1569,7 @@ func Test_Client_JobList(t *testing.T) {
}
})

t.Run("SortsRunningJobsByAttemptedAt", func(t *testing.T) {
t.Run("OrderByTimeSortsRunningJobsByAttemptedAt", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand All @@ -1561,7 +1578,7 @@ func Test_Client_JobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: &now})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})

res, err := client.JobList(ctx, NewJobListParams().States(rivertype.JobStateRunning))
res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(rivertype.JobStateRunning))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

Expand All @@ -1583,11 +1600,70 @@ func Test_Client_JobList(t *testing.T) {

res, err := client.JobList(ctx, nil)
require.NoError(t, err)
// sort order is switched by ScheduledAt values:
require.Equal(t, []int64{job2.ID, job3.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
// sort order defaults to ID
require.Equal(t, []int64{job1.ID, job2.ID, job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("PaginatesWithAfter_JobListOrderByID", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})

res, err := client.JobList(ctx, NewJobListParams().After(JobListCursorFromJob(job1)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByID, res.LastCursor.sortField)
require.Equal(t, job3.ID, res.LastCursor.id)

// No more results
res, err = client.JobList(ctx, NewJobListParams().After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Nil(t, res.LastCursor)

// Descending
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByID, SortOrderDesc).After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByID, res.LastCursor.sortField)
require.Equal(t, job1.ID, res.LastCursor.id)
})

t.Run("PaginatesWithAfter", func(t *testing.T) {
t.Run("PaginatesWithAfter_JobListOrderByScheduledAt", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

now := time.Now().UTC()
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ScheduledAt: &now})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ScheduledAt: ptrutil.Ptr(now.Add(1 * time.Second))})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ScheduledAt: ptrutil.Ptr(now.Add(2 * time.Second))})

res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderAsc).After(JobListCursorFromJob(job1)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByScheduledAt, res.LastCursor.sortField)
require.Equal(t, job3.ID, res.LastCursor.id)

// No more results
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderAsc).After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Nil(t, res.LastCursor)

// Descending
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderDesc).After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByScheduledAt, res.LastCursor.sortField)
require.Equal(t, job1.ID, res.LastCursor.id)
})

t.Run("PaginatesWithAfter_JobListOrderByTime", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand All @@ -1600,26 +1676,23 @@ func Test_Client_JobList(t *testing.T) {
job5 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), ScheduledAt: ptrutil.Ptr(now.Add(-7 * time.Second)), FinalizedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job6 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), ScheduledAt: ptrutil.Ptr(now.Add(-7 * time.Second)), FinalizedAt: &now})

res, err := client.JobList(ctx, NewJobListParams().States(rivertype.JobStateAvailable).After(JobListCursorFromJob(job1, JobListOrderByTime)))
res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(rivertype.JobStateAvailable).After(JobListCursorFromJob(job1)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByTime, res.LastCursor.sortField)
require.Equal(t, job2.ID, res.LastCursor.id)

res, err = client.JobList(ctx, NewJobListParams().States(rivertype.JobStateRunning).After(JobListCursorFromJob(job3, JobListOrderByTime)))
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(rivertype.JobStateRunning).After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{job4.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByTime, res.LastCursor.sortField)
require.Equal(t, job4.ID, res.LastCursor.id)

res, err = client.JobList(ctx, NewJobListParams().States(rivertype.JobStateCompleted).After(JobListCursorFromJob(job5, JobListOrderByTime)))
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(rivertype.JobStateCompleted).After(JobListCursorFromJob(job5)))
require.NoError(t, err)
require.Equal(t, []int64{job6.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByTime, res.LastCursor.sortField)
require.Equal(t, job6.ID, res.LastCursor.id)

res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderAsc).After(JobListCursorFromJob(job4, JobListOrderByScheduledAt)))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID, job3.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByScheduledAt, res.LastCursor.sortField)
require.Equal(t, job2.ID, res.LastCursor.id)
})

t.Run("MetadataOnly", func(t *testing.T) {
Expand Down
Loading
Loading