From 631892b0ce99436165fbb5351ba0402156468347 Mon Sep 17 00:00:00 2001 From: Phil Constantinou Date: Mon, 26 Feb 2024 00:16:21 -0800 Subject: [PATCH 1/6] Refactor unit tests --- backends/memory/memory_backend.go | 2 +- backends/memory/memory_backend_test.go | 12 +- backends/redis/redis_backend_test.go | 4 +- backends/suite_test.go | 372 ------------------------- 4 files changed, 13 insertions(+), 377 deletions(-) diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index be15bbc..ca973b0 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -67,7 +67,7 @@ func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, e } // Enqueue queues jobs to be executed asynchronously -func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) { +func (m *MemBackend) Enqueue(_ context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) { options := neoq.JobOptions{} for _, opt := range jobOptions { opt(&options) diff --git a/backends/memory/memory_backend_test.go b/backends/memory/memory_backend_test.go index 5ffd9ad..afa3ff5 100644 --- a/backends/memory/memory_backend_test.go +++ b/backends/memory/memory_backend_test.go @@ -19,6 +19,7 @@ import ( "github.com/acaloiaro/neoq/testutils" "github.com/pkg/errors" "github.com/robfig/cron" + "github.com/stretchr/testify/suite" "golang.org/x/exp/slog" ) @@ -426,11 +427,16 @@ result_loop: } } -func TestSuite(t *testing.T) { +func initQueue() (neoq.Neoq, error) { ctx := context.Background() - n, err := neoq.New(ctx, neoq.WithBackend(memory.Backend), neoq.WithLogLevel(logging.LogLevelDebug)) + return neoq.New(ctx, neoq.WithBackend(memory.Backend), neoq.WithLogLevel(logging.LogLevelDebug)) //nolint: error +} + +func TestSuite(t *testing.T) { + n, err := initQueue() if err != nil { t.Fatal(err) } - backends.NewNeoQTestSuite(n).Run(t) + s := backends.NewNeoQTestSuite(n) + suite.Run(t, s) } diff --git a/backends/redis/redis_backend_test.go b/backends/redis/redis_backend_test.go index fc5dac5..83fbb1a 100644 --- a/backends/redis/redis_backend_test.go +++ b/backends/redis/redis_backend_test.go @@ -16,6 +16,7 @@ import ( "github.com/acaloiaro/neoq/logging" "github.com/acaloiaro/neoq/testutils" "github.com/hibiken/asynq" + "github.com/stretchr/testify/suite" ) const ( @@ -462,5 +463,6 @@ func TestSuite(t *testing.T) { t.Fatal(err) } - backends.NewNeoQTestSuite(nq).Run(t) + s := backends.NewNeoQTestSuite(nq) + suite.Run(t, s) } diff --git a/backends/suite_test.go b/backends/suite_test.go index 14634b0..6c70ab4 100644 --- a/backends/suite_test.go +++ b/backends/suite_test.go @@ -1,373 +1 @@ package backends - -import ( - "context" - "errors" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/acaloiaro/neoq" - "github.com/acaloiaro/neoq/backends/memory" - "github.com/acaloiaro/neoq/handler" - "github.com/acaloiaro/neoq/jobs" - "github.com/acaloiaro/neoq/logging" - "github.com/stretchr/testify/suite" -) - -const proceedKey = "proceed" - -// foundKey is populated in the payload if the test expects to find the job -const foundKey = "found" - -const messageKey = "messageKey" - -var testQueue = fmt.Sprintf("testqfinger-%d", time.Now().Unix()) - -const doneKey = "done" -const runAfterKey = "runAfter" -const fingerprint1 = "fp1" - -// TestSuite tests the backends independent of implementation -func TestSuite(t *testing.T) { - ctx := context.Background() - s := new(NeoQTestSuite) - var err error - s.NeoQ, err = neoq.New(ctx, neoq.WithBackend(memory.Backend), neoq.WithLogLevel(logging.LogLevelDebug)) - if err != nil { - t.Fatal(err) - } - suite.Run(t, s) -} - -// TestOverrideFingerprint provides a test case that works with all the backends that -// verifies that overriding jobs with a new fingerprint works. -// The test case is queues several jobs, some are designed to be overridden by subsequent jobs, some are not. -func (s *NeoQTestSuite) TestOverrideFingerprint() { - ctx := context.Background() - fingerprint1 := fmt.Sprintf("fingerprint-1-%d", rand.Int63()) - fingerprint2 := fmt.Sprintf("fingerprint-2-%d", rand.Int63()) - - var err error - jobsProcessed, jobsToDo := 0, 3 - - fingerPrints := make(map[string]int) - - done := make(chan bool) - // proceed channel ensure that jobs that are supposed to be processed are processed - proceed := make(chan bool) - - h1 := handler.New(testQueue, func(ctx context.Context) error { - j, err := jobs.FromContext(ctx) - s.NoError(err) - s.NotEmpty(j) - message := j.Payload[messageKey].(string) - - found := j.Payload[foundKey].(bool) - s.Truef(found, "Found job that should not be found: %s", message) - - if _, ok := j.Payload[proceedKey]; ok { - proceed <- true - } - - fingerPrints[j.Fingerprint]++ - jobsProcessed++ - s.T().Logf("Handled job %d with fingerprint %s and ID %d Payload: %s", jobsProcessed, j.Fingerprint, j.ID, message) - if jobsToDo == jobsProcessed { - done <- true - } - return err - }) - - s.NoError(s.NeoQ.Start(ctx, h1)) - - go func() { - now := time.Now() - _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]any{ - messageKey: "first queued item. we'll wait until this is processed", - foundKey: true, - proceedKey: true, - }, - RunAfter: now, - Fingerprint: fingerprint1, - }) - s.NoError(err, "job was not enqueued.") - <-proceed - - runAt := now.Add(5 * time.Second) - _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]any{ - messageKey: "should insert, since the prior key has been processed.", - foundKey: true, - proceedKey: true, - }, - RunAfter: runAt, - Fingerprint: fingerprint2, - }) - s.NoError(err) - _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]any{ - messageKey: "should not be queued - conflicting key", - foundKey: false, - proceedKey: true, - }, - RunAfter: runAt, - Fingerprint: fingerprint2, - }) - s.Error(err, "should not insert") - s.True(errors.Is(err, jobs.ErrJobFingerprintConflict)) - _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]any{ - messageKey: "(2) the item that overwrites may be found", - proceedKey: true, - foundKey: true}, - RunAfter: now, - Fingerprint: fingerprint2, - }, neoq.WithOverrideMatchingFingerprint()) - - s.NoErrorf(err, "Should have returned nil but returned %v", err) - <-proceed - _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]interface{}{ - messageKey: "(3) the new item", - proceedKey: true, - foundKey: true}, - RunAfter: now, - Fingerprint: fingerprint2, - }) - s.NoErrorf(err, "job was not enqueued.%w", err) - <-proceed - }() - - timeoutTimer := time.After(time.Minute) -results_loop: - for { - select { - case <-timeoutTimer: - err = jobs.ErrJobTimeout - break results_loop - case <-done: - break results_loop - } - } - s.NoError(err) - s.Equalf(jobsToDo, jobsProcessed, - "handler should have handled %d jobs, but handled %d. %v", - jobsToDo, jobsProcessed, fingerPrints) -} - -func makeHandler(s *NeoQTestSuite, done chan bool) handler.Handler { - return handler.New(testQueue, func(ctx context.Context) (err error) { - var j *jobs.Job - j, err = jobs.FromContext(ctx) - if !s.NoError(err) && !s.NotNil(j) { - return fmt.Errorf("failed to get job from context %w", err) - } - message := j.Payload[messageKey].(string) - - found := j.Payload[foundKey].(bool) - s.Truef(found, "Found job that should not be found: %s", message) - - isDone, ok := j.Payload[doneKey] - if ok && isDone.(bool) { - done <- true - } - return nil - }) - -} - -// TestConflictingFingerprints verifies that that conflicting fingerprints can't override one another -func (s *NeoQTestSuite) TestConflictingFingerprints() { - ctx := context.Background() - fingerPrint := fmt.Sprintf("fingerprint-conflict-%d", rand.Int63()) - - done := make(chan bool) - h := makeHandler(s, done) - s.NoError(s.NeoQ.Start(ctx, h)) - runAt := time.Now().Add(time.Second * 2) - go func() { - _, err := s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]any{ - messageKey: "(1) first queued item we'll wait until this is processed", - doneKey: true, - foundKey: true}, - RunAfter: runAt, - Fingerprint: fingerPrint, - }) - s.NoErrorf(err, "job was not enqueued.%w", err) - - _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]any{ - messageKey: "first queued item - should be overwritten", foundKey: false}, - RunAfter: runAt, - Fingerprint: fingerPrint, - }) - s.Errorf(err, "Job with fingerprint %s should not be queued", fingerPrint) - }() - - var err error - timeoutTimer := time.After(time.Minute) -results_loop: - for { - select { - case <-timeoutTimer: - err = jobs.ErrJobTimeout - break results_loop - case <-done: - break results_loop - } - } - s.NoError(err) -} - -// TestOverridingFingerprints verifies that when WithOverrideMatchingFingerprint is set, an overridding -// job can be written to the queue -func (s *NeoQTestSuite) TestOverridingFingerprints() { - ctx := context.Background() - fingerprint1 := fingerprint1 + time.Now().String() - done := make(chan bool) - - s.NoError(s.NeoQ.Start(ctx, makeHandler(s, done))) - - go func() { - _, err := s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]any{ - messageKey: "job should not be found, it should be overridden", - doneKey: true, - foundKey: false}, - RunAfter: time.Now().Add(time.Second), - Fingerprint: fingerprint1, - }) - s.NoErrorf(err, "job was not enqueued.%w", err) - - _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]any{ - messageKey: "job should be found", - foundKey: true, - doneKey: true}, - RunAfter: time.Now().Add(time.Second), - Fingerprint: fingerprint1, - }, neoq.WithOverrideMatchingFingerprint()) - s.NoErrorf(err, "job was not enqueued.%w", err) - }() - - var err error - timeoutTimer := time.After(time.Minute) - for finished := false; !finished; { - select { - case <-timeoutTimer: - err = jobs.ErrJobTimeout - finished = true - case <-done: - finished = true - } - } - s.NoError(err) -} - -// TestOverridingFingerprints verifies that when WithOverrideMatchingFingerprint is set, an overridding -// job can be written to the queue -func (s *NeoQTestSuite) TestMultipleOverridingFingerprints() { - ctx := context.Background() - fingerPrint := "fingerprint1" + time.Now().String() - - done := make(chan bool) - - s.NoError(s.NeoQ.Start(ctx, makeHandler(s, done))) - - go func() { - last := 5 - for i := 1; i <= last; i++ { - finalEntry := i == last - runAfter := time.Now() - if !finalEntry { - runAfter = runAfter.Add(time.Hour) - } - _, err := s.NeoQ.Enqueue(ctx, &jobs.Job{ - Queue: testQueue, - Payload: map[string]any{ - messageKey: fmt.Sprintf("queue'd job %d. Is final? %v - run after %s", i, finalEntry, runAfter), - foundKey: finalEntry, - doneKey: finalEntry, - runAfterKey: runAfter, - }, - RunAfter: runAfter, - Fingerprint: fingerPrint, - }, neoq.WithOverrideMatchingFingerprint()) - - s.NoErrorf(err, "job was not enqueued.%w", err) - } - }() - var err error - timeoutTimer := time.After(time.Minute * 20) - for finished := false; !finished; { - select { - case <-timeoutTimer: - err = jobs.ErrJobTimeout - finished = true - case <-done: - finished = true - } - } - s.NoError(err) -} - -// // TestBasicJobProcessing tests that the memory backend is able to process the most basic jobs with the -// // most basic configuration. -// func (s *NeoQTestSuite) TestBasicJobProcessing() { -// queue := fmt.Sprintf("basic-queue-%d", rand.Int63()) -// numJobs := 1000 -// doneCnt := 0 -// done := make(chan bool) -// ctx := context.Background() -// timeoutTimer := time.After(5 * time.Second) - -// h := handler.New(queue, func(_ context.Context) (err error) { -// done <- true -// return -// }) - -// s.NoError(s.NeoQ.Start(ctx, h)) - -// go func() { -// for i := 0; i < numJobs; i++ { -// jid, err := s.NeoQ.Enqueue(ctx, &jobs.Job{ -// Queue: queue, -// Payload: map[string]interface{}{ -// "message": fmt.Sprintf("hello world: %d", i), -// }, -// }) -// s.NoError(err, "job was not enqueued.") -// s.NotEqual(jobs.DuplicateJobID, jid, "duplicate or this error caused it: %v , %s", jid, err) -// } -// }() -// var err error -// for { -// select { -// case <-timeoutTimer: -// err = jobs.ErrJobTimeout -// case <-done: -// doneCnt++ -// } - -// if doneCnt >= numJobs { -// break -// } - -// if err != nil { -// break -// } -// } -// } From bb62dfae0d48a5ebb9127c076118f3e787c704ac Mon Sep 17 00:00:00 2001 From: Phil Constantinou Date: Mon, 26 Feb 2024 00:17:28 -0800 Subject: [PATCH 2/6] Refactor unit tests --- backends/suite.go | 380 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 377 insertions(+), 3 deletions(-) diff --git a/backends/suite.go b/backends/suite.go index 10aef7d..d282afb 100644 --- a/backends/suite.go +++ b/backends/suite.go @@ -2,9 +2,17 @@ package backends import ( "context" + "errors" + "fmt" + "math/rand" "testing" + "time" "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/postgres" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" + "github.com/acaloiaro/neoq/logging" "github.com/stretchr/testify/suite" ) @@ -21,10 +29,376 @@ func NewNeoQTestSuite(q neoq.Neoq) *NeoQTestSuite { return n } -func (s *NeoQTestSuite) Run(t *testing.T) { +func (s *NeoQTestSuite) TearDownSuite() { + s.NeoQ.Shutdown(context.Background()) +} + +var testQueue = fmt.Sprintf("testq-%d", time.Now().Unix()) + +const ( + proceedKey = "proceed" + // foundKey is populated in the payload if the test expects to find the job + foundKey = "found" + + messageKey = "messageKey" + + doneKey = "done" + runAfterKey = "runAfter" + fingerprint1 = "fp1" +) + +// TestSuite tests the backends independent of implementation +func TestSuite(t *testing.T) { + ctx := context.Background() + s := new(NeoQTestSuite) + var err error + s.NeoQ, err = neoq.New(ctx, neoq.WithBackend(postgres.Backend), neoq.WithLogLevel(logging.LogLevelDebug)) + if err != nil { + t.Fatal(err) + } suite.Run(t, s) } -func (s *NeoQTestSuite) TearDownSuite() { - s.NeoQ.Shutdown(context.Background()) +// TestOverrideFingerprint provides a test case that works with all the backends that +// verifies that overriding jobs with a new fingerprint works. +// The test case is queues several jobs, some are designed to be overridden by subsequent jobs, some are not. +func (s *NeoQTestSuite) TestOverrideFingerprint() { + ctx := context.Background() + fingerprint1 := fmt.Sprintf("fingerprint-1-%d", rand.Int63()) + fingerprint2 := fmt.Sprintf("fingerprint-2-%d", rand.Int63()) + + var err error + jobsProcessed, jobsToDo := 0, 3 + + fingerPrints := make(map[string]int) + + done := make(chan bool) + // proceed channel ensure that jobs that are supposed to be processed are processed + proceed := make(chan bool) + + h1 := handler.New(testQueue, func(ctx context.Context) error { + j, err := jobs.FromContext(ctx) + s.NoError(err) + s.NotEmpty(j) + message := j.Payload[messageKey].(string) + + found := j.Payload[foundKey].(bool) + s.Truef(found, "Found job that should not be found: %s", message) + + if _, ok := j.Payload[proceedKey]; ok { + proceed <- true + } + + fingerPrints[j.Fingerprint]++ + jobsProcessed++ + s.T().Logf("Handled job %d with fingerprint %s and ID %d Payload: %s", jobsProcessed, j.Fingerprint, j.ID, message) + if jobsToDo == jobsProcessed { + done <- true + } + return err + }) + + s.NoError(s.NeoQ.Start(ctx, h1)) + + go func() { + now := time.Now() + _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: "first queued item. we'll wait until this is processed", + foundKey: true, + proceedKey: true, + }, + RunAfter: now, + Fingerprint: fingerprint1, + }) + s.NoError(err, "job was not enqueued.") + <-proceed + + runAt := now.Add(120 * time.Second) + _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: "should insert, since the prior key has been processed.", + foundKey: true, + proceedKey: true, + }, + RunAfter: runAt, + Fingerprint: fingerprint2, + }) + s.NoError(err) + _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: "should not be queued - conflicting key", + foundKey: false, + proceedKey: true, + }, + RunAfter: runAt, + Fingerprint: fingerprint2, + }) + s.Error(err, "should not insert") + s.True(errors.Is(err, jobs.ErrJobFingerprintConflict)) + _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: "(2) the item that overwrites may be found", + proceedKey: true, + foundKey: true}, + RunAfter: now, + Fingerprint: fingerprint2, + }, neoq.WithOverrideMatchingFingerprint()) + + s.NoErrorf(err, "Should have returned nil but returned %v", err) + <-proceed + _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]interface{}{ + messageKey: "(3) the new item", + proceedKey: true, + foundKey: true}, + RunAfter: now, + Fingerprint: fingerprint2, + }) + s.NoErrorf(err, "job was not enqueued.%w", err) + <-proceed + }() + + timeoutTimer := time.After(time.Minute) +results_loop: + for { + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + break results_loop + case <-done: + break results_loop + } + } + s.NoError(err) + s.Equalf(jobsToDo, jobsProcessed, + "handler should have handled %d jobs, but handled %d. %v", + jobsToDo, jobsProcessed, fingerPrints) +} + +func makeHandler(s *NeoQTestSuite, done chan bool) handler.Handler { + return handler.New(testQueue, func(ctx context.Context) (err error) { + var j *jobs.Job + j, err = jobs.FromContext(ctx) + if !s.NoError(err) && !s.NotNil(j) { + return fmt.Errorf("failed to get job from context %w", err) + } + message := j.Payload[messageKey].(string) + + found := j.Payload[foundKey].(bool) + s.Truef(found, "Found job that should not be found: %s", message) + + isDone, ok := j.Payload[doneKey] + if ok && isDone.(bool) { + done <- true + } + return nil + }) + +} + +// TestConflictingFingerprints verifies that that conflicting fingerprints can't override one another +func (s *NeoQTestSuite) TestConflictingFingerprints() { + ctx := context.Background() + fingerPrint := fmt.Sprintf("ConflictingFingerprints-%d", time.Now().Nanosecond()) + + done := make(chan bool) + h := makeHandler(s, done) + s.NoError(s.NeoQ.Start(ctx, h)) + runAt := time.Now().UTC().Add(time.Second * 2) + go func() { + _, err := s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: "(1) first queued item we'll wait until this is processed", + doneKey: true, + foundKey: true}, + RunAfter: runAt, + Fingerprint: fingerPrint, + }) + s.NoErrorf(err, "job was not enqueued.%w", err) + + _, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: "conflicting item should be should not save", + foundKey: false}, + RunAfter: runAt, + Fingerprint: fingerPrint, + }) + s.Errorf(err, "Job with fingerprint %s should not be queued", fingerPrint) + }() + + var err error + timeoutTimer := time.After(time.Minute) +results_loop: + for { + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + break results_loop + case <-done: + break results_loop + } + } + s.NoError(err) +} + +// TestOverridingFingerprints verifies that when WithOverrideMatchingFingerprint is set, an overridding +// job can be written to the queue +func (s *NeoQTestSuite) TestOverridingFingerprints() { + ctx := context.Background() + conflictingFingerprint := fmt.Sprintf("OverridingFingerprints-%d", time.Now().UnixMicro()) + done := make(chan bool) + + s.NoError(s.NeoQ.Start(ctx, makeHandler(s, done))) + + now := time.Now().UTC() + + go func() { + job, err := s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: "TestOverridingFingerprints. No conflict.", + doneKey: false, + foundKey: true}, + RunAfter: now.Add(time.Hour), + }) + s.NoErrorf(err, "job (%s) was not enqueued.%w", job, err) + + job, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: "TestOverridingFingerprints- job should not be found, it should be overridden", + doneKey: true, + foundKey: false}, + RunAfter: now.Add(time.Hour), + Fingerprint: conflictingFingerprint, + }) + s.NoErrorf(err, "job was not enqueued.%w", err) + + job, err = s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: fmt.Sprintf("job should be found - overwrites JOB: %s", job), + foundKey: true, + doneKey: true}, + RunAfter: time.Now().UTC().Add(time.Second), + Fingerprint: conflictingFingerprint, + }, neoq.WithOverrideMatchingFingerprint()) + s.NoErrorf(err, "job was not enqueued job: %s. %w ", job, err) + }() + + var err error + timeoutTimer := time.After(2 * time.Minute) + for finished := false; !finished; { + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + finished = true + case <-done: + finished = true + } + } + s.NoError(err) +} + +// TestOverridingFingerprints verifies that when WithOverrideMatchingFingerprint is set, an overridding +// job can be written to the queue +func (s *NeoQTestSuite) TestMultipleOverridingFingerprints() { + ctx := context.Background() + fingerPrint := "fingerprint1" + time.Now().String() + + done := make(chan bool) + + s.NoError(s.NeoQ.Start(ctx, makeHandler(s, done))) + + go func() { + last := 5 + for i := 1; i <= last; i++ { + finalEntry := i == last + runAfter := time.Now().UTC() + if !finalEntry { + runAfter = runAfter.Add(time.Hour) + } + _, err := s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: testQueue, + Payload: map[string]any{ + messageKey: fmt.Sprintf("queue'd job %d. Is final? %v - run after %s", i, finalEntry, runAfter), + foundKey: finalEntry, + doneKey: finalEntry, + runAfterKey: runAfter, + }, + RunAfter: runAfter, + Fingerprint: fingerPrint, + }, neoq.WithOverrideMatchingFingerprint()) + + s.NoErrorf(err, "job was not enqueued.%w", err) + } + }() + var err error + timeoutTimer := time.After(time.Minute * 20) + for finished := false; !finished; { + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + finished = true + case <-done: + finished = true + } + } + s.NoError(err) +} + +// TestBasicJobProcessing tests that the memory backend is able to process the most basic jobs with the +// most basic configuration. +func (s *NeoQTestSuite) TestBasicJobProcessing() { + queue := fmt.Sprintf("basic-queue-%d", rand.Int63()) + numJobs := 1000 + doneCnt := 0 + done := make(chan bool) + ctx := context.Background() + timeoutTimer := time.After(5 * time.Second) + + h := handler.New(queue, func(_ context.Context) (err error) { + done <- true + return + }) + + s.NoError(s.NeoQ.Start(ctx, h)) + + go func() { + for i := 0; i < numJobs; i++ { + jid, err := s.NeoQ.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world: %d", i), + }, + }) + s.NoError(err, "job was not enqueued.") + s.NotEqual(jobs.DuplicateJobID, jid, "duplicate or this error caused it: %v , %s", jid, err) + } + }() + var err error + for { + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + case <-done: + doneCnt++ + } + + if doneCnt >= numJobs { + break + } + + if err != nil { + break + } + } } From 0ed090cdc555765e59226dc73a7d941eeb449ed2 Mon Sep 17 00:00:00 2001 From: Phil Constantinou Date: Mon, 26 Feb 2024 00:18:46 -0800 Subject: [PATCH 3/6] Switch job overwrite option to try update then insert since ON CONFLICT doesn't work with WHERE on NULL's --- backends/postgres/postgres_backend.go | 55 +++++++++++++++++++++------ 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 8d9ceaa..99dc57d 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -573,21 +573,49 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job, opti fmt.Errorf("%w: %s", jobs.ErrCantGenerateFingerprint, err.Error()) } p.logger.Debug("adding job to the queue", slog.String("queue", j.Queue)) - err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries) - VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`, - j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID) - if isUniqueConflict(err) && options.Override { - err = tx.QueryRow(ctx, `UPDATE neoq_jobs set payload=$3, run_after=$4, deadline=$5, max_retries=$6, retries=0 - WHERE queue = $1 and fingerprint = $2 and status != "processed" - RETURNING id`, - j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID) - if err != nil { - p.logger.Error("error enqueueing override job", slog.Any("error", err)) + var query string + didUpdate := false + if options.Override { + var rows pgx.Rows + query = ` + UPDATE neoq_jobs + SET + payload = $3, + run_after = $4, + deadline = $5, + max_retries = $6, + status = $7 + WHERE + queue = $1 AND + fingerprint = $2 AND + status != $8 + RETURNING id` + rows, err = tx.Query(ctx, query, + j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries, internal.JobStatusNew, internal.JobStatusProcessed) + if rows != nil { + defer rows.Close() + if err == nil && rows.Next() { + didUpdate = true + err = rows.Scan(&jobID) + } } } + if !didUpdate { // This will always run unless the Override is set and an update was performed + query = ` + INSERT INTO neoq_jobs (queue, fingerprint, payload, run_after, deadline, max_retries) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id` + err = tx.QueryRow(ctx, query, j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID) + } + if err != nil { + p.logger.Error("error enqueueing override job", slog.Any("error", err)) + } + if isUniqueConflict(err) && !options.Override { + err = fmt.Errorf("conflicting job: %w", err) + } if err != nil { - err = fmt.Errorf("unable add job to queue: %w", err) + err = fmt.Errorf("unable add job to queue [%s] with fingerprint [%s]: %w", j.Queue, j.Fingerprint, err) return } @@ -895,6 +923,11 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { return } + if job.RunAfter.After(time.Now()) { + p.logger.Error("Running a job too %s early.", -time.Since(job.RunAfter)) + // return // + } + if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { err = jobs.ErrJobExceededDeadline p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", job.Queue), slog.Int64("job_id", job.ID)) From 840809b8d4e84a9ef248243fc3b024e7ca31e1b0 Mon Sep 17 00:00:00 2001 From: Phil Constantinou Date: Mon, 26 Feb 2024 00:37:40 -0800 Subject: [PATCH 4/6] Refactored tests --- backends/postgres/postgres_backend_test.go | 20 ++++++++++++-------- backends/suite.go | 1 + 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index f4fcc3b..f5c3286 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -21,6 +21,7 @@ import ( "github.com/acaloiaro/neoq/testutils" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/suite" ) const ( @@ -776,17 +777,20 @@ func Test_ConnectionTimeout(t *testing.T) { } } -func SetupSuite(t *testing.T) (neoq.Neoq, context.Context) { +func initQueue(t *testing.T) (neoq.Neoq, error) { + t.Helper() connString, _ := prepareAndCleanupDB(t) - - ctx := context.TODO() - nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) + nq, err := neoq.New(context.Background(), neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) if err != nil { - t.Fatal(err) + err = fmt.Errorf("Failed to create queue %w", err) } - return nq, ctx + return nq, err } func TestSuite(t *testing.T) { - n, _ := SetupSuite(t) - backends.NewNeoQTestSuite(n).Run(t) + n, err := initQueue(t) + if err != nil { + t.Fatal(err) + } + s := backends.NewNeoQTestSuite(n) + suite.Run(t, s) } diff --git a/backends/suite.go b/backends/suite.go index d282afb..6b411e5 100644 --- a/backends/suite.go +++ b/backends/suite.go @@ -183,6 +183,7 @@ results_loop: func makeHandler(s *NeoQTestSuite, done chan bool) handler.Handler { return handler.New(testQueue, func(ctx context.Context) (err error) { + s.T().Helper() var j *jobs.Job j, err = jobs.FromContext(ctx) if !s.NoError(err) && !s.NotNil(j) { From 2984d56fe6a3ae62a36777dc2d724fcdbe4630b9 Mon Sep 17 00:00:00 2001 From: Phil Constantinou Date: Mon, 26 Feb 2024 00:38:51 -0800 Subject: [PATCH 5/6] Add error log when run too early --- backends/postgres/postgres_backend.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 99dc57d..3fd09e1 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -924,8 +924,11 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { } if job.RunAfter.After(time.Now()) { - p.logger.Error("Running a job too %s early.", -time.Since(job.RunAfter)) - // return // + p.logger.Error("Running a job too early.", + slog.String("queue", job.Queue), + slog.Int64("job_id", job.ID), + slog.String("duration", (-time.Since(job.RunAfter)).String())) + return // } if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { From ad4410e87d1dac1543b1dc75a5c7206f45d9aa8d Mon Sep 17 00:00:00 2001 From: Phil Constantinou Date: Mon, 26 Feb 2024 00:56:47 -0800 Subject: [PATCH 6/6] Try to work around runafter error --- backends/memory/memory_backend.go | 3 +-- backends/postgres/postgres_backend.go | 5 +++-- internal/internal.go | 8 ++++++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index ca973b0..2be294f 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -120,7 +120,6 @@ func (m *MemBackend) Enqueue(_ context.Context, job *jobs.Job, jobOptions ...neo m.logger.Info("Expected to get job but none was returned for fingerprint %s", job.Fingerprint) } jobID = fmt.Sprint(job.ID) - } else { m.fingerprints.Store(job.Fingerprint, job) m.mu.Lock() @@ -260,7 +259,7 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) { m.logger.Error("job failed", slog.Int64("job_id", job.ID), slog.Any("error", err)) - runAfter := internal.CalculateBackoff(job.Retries) + runAfter := internal.CalculateBackoff(job.Retries, job.RunAfter) job.RunAfter = runAfter job.Status = internal.JobStatusFailed m.queueFutureJob(job) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 3fd09e1..13716a6 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -679,7 +679,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { var runAfter time.Time if status == internal.JobStatusFailed { - runAfter = internal.CalculateBackoff(job.Retries) + runAfter = internal.CalculateBackoff(job.Retries, job.RunAfter) qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4, run_after = $5 WHERE id = $6" _, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.Retries, runAfter, job.ID) } else { @@ -928,7 +928,8 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { slog.String("queue", job.Queue), slog.Int64("job_id", job.ID), slog.String("duration", (-time.Since(job.RunAfter)).String())) - return // + err = p.updateJob(ctx, fmt.Errorf("run to soon")) + return } if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { diff --git a/internal/internal.go b/internal/internal.go index 62c8bc7..297d3de 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -19,11 +19,15 @@ var JobCtxVarKey contextKey // CalculateBackoff calculates the number of seconds to back off before the next retry // this formula is unabashedly taken from Sidekiq because it is good. -func CalculateBackoff(retryCount int) time.Time { +func CalculateBackoff(retryCount int, runAfter time.Time) time.Time { + now := time.Now().UTC() + if runAfter.Before(now) { + runAfter = now + } const backoffExponent = 4 const maxInt = 30 p := int(math.Round(math.Pow(float64(retryCount), backoffExponent))) - return time.Now().UTC().Add(time.Duration(p+15+RandInt(maxInt)*retryCount+1) * time.Second) + return runAfter.Add(time.Duration(p+15+RandInt(maxInt)*retryCount+1) * time.Second) } // RandInt returns a random integer up to max