Skip to content

Commit

Permalink
fix running job cancellation when not fetching
Browse files Browse the repository at this point in the history
Cancellation of running jobs relied on a channel that was only being
received when in the job fetch routine, meaning that jobs which were
cancelled would not be cancelled until the next scheduled fetch.

This was fixed by also receiving from the job cancellation channel when
in the main producer loop, even if no fetches are happening.
  • Loading branch information
bgentry committed Nov 15, 2024
1 parent ed88f73 commit f0d65ab
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Cancellation of running jobs relied on a channel that was only being received when in the job fetch routine, meaning that jobs which were cancelled would not be cancelled until the next scheduled fetch. This was fixed by also receiving from the job cancellation channel when in the main producer loop, even if no fetches are happening. [PR #XXX](https://github.com/riverqueue/river/pull/XXX).

## [0.14.1] - 2024-11-04

### Fixed
Expand Down
22 changes: 18 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,12 @@ func Test_Client(t *testing.T) {
// _outside of_ a transaction. The exact same test logic applies to each case,
// the only difference is a different cancelFunc provided by the specific
// subtest.
cancelRunningJobTestHelper := func(t *testing.T, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper
client, bundle := setup(t)
cancelRunningJobTestHelper := func(t *testing.T, config *Config, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper
defaultConfig, bundle := setupConfig(t)
if config == nil {
config = defaultConfig
}
client := newTestClient(t, bundle.dbPool, config)

jobStartedChan := make(chan int64)

Expand Down Expand Up @@ -492,15 +496,25 @@ func Test_Client(t *testing.T) {
t.Run("CancelRunningJob", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.JobCancel(ctx, jobID)
})
})

t.Run("CancelRunningJobWithLongPollInterval", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)
config.FetchPollInterval = 60 * time.Second
cancelRunningJobTestHelper(t, config, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.JobCancel(ctx, jobID)
})
})

t.Run("CancelRunningJobInTx", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
var (
job *rivertype.JobRow
err error
Expand Down
2 changes: 2 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
default:
p.Logger.DebugContext(workCtx, p.Name+": Unknown queue control action", "action", msg.Action)
}
case jobID := <-p.cancelCh:
p.maybeCancelJob(jobID)
case <-fetchLimiter.C():
if p.paused {
continue
Expand Down

0 comments on commit f0d65ab

Please sign in to comment.