Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support optional override argument when enqueuing job #117

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
af450bf
Merge from main of the test suite
pconstantinou Feb 9, 2024
8b68a09
Bump version numbers of build actions
pconstantinou Feb 11, 2024
a971e9c
Drop duplicate index
pconstantinou Feb 16, 2024
016b2d2
Fix overwrite
pconstantinou Feb 23, 2024
675327e
Handle edge cases better
pconstantinou Feb 23, 2024
9255606
Merge branch 'acaloiaro:main' into main
pconstantinou Feb 23, 2024
e7ef0a4
Merge pull request #1 from pconstantinou/feature/bump-version
pconstantinou Feb 23, 2024
631892b
Refactor unit tests
pconstantinou Feb 26, 2024
bb62dfa
Refactor unit tests
pconstantinou Feb 26, 2024
0ed090c
Switch job overwrite option to try update then insert since ON CONFLI…
pconstantinou Feb 26, 2024
840809b
Refactored tests
pconstantinou Feb 26, 2024
2984d56
Add error log when run too early
pconstantinou Feb 26, 2024
ad4410e
Try to work around runafter error
pconstantinou Feb 26, 2024
4a13c41
Merge pull request #2 from pconstantinou/feature/bump-version
pconstantinou Feb 26, 2024
951f8ca
Set job in context before calling updates
pconstantinou Mar 25, 2024
f84f7cb
Merge pull request #3 from pconstantinou/bug/set-context
pconstantinou Mar 25, 2024
3afe2fa
chore: Add test coverage over postgres future jobs
acaloiaro Mar 2, 2024
1fb07c0
feat: Make fingerprint unique to each queue
acaloiaro Mar 2, 2024
bc26e01
feat: Update migrations postgres URI parsing and add tests
Mar 5, 2024
310bebf
fix: An incorrect error was thrown when jobs exceeding their deadline…
acaloiaro Mar 25, 2024
076a3f6
feat: fix infinite scheduling loop when job gets scheduled after dead…
acaloiaro Mar 27, 2024
93ad210
chore(deps): bump github.com/jackc/pgx/v5 from 5.3.1 to 5.5.4
dependabot[bot] Apr 4, 2024
584a182
feat: add support for recovery callbacks
acaloiaro Apr 16, 2024
e3d6237
feat: Automatically reconnect when the PG listener connection fails
acaloiaro Apr 23, 2024
f5e7ee8
feat: Add public jobs.WithJobContext()
acaloiaro Jun 7, 2024
90a3f89
feat: log job_id regardless of error
acaloiaro Jun 13, 2024
d05bf4e
Merge with main branch
pconstantinou Jun 25, 2024
f7da9f3
Flip logic
pconstantinou Jun 26, 2024
bcc39d1
Remove safety check for running too soon
pconstantinou Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ jobs:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
- uses: actions/setup-go@v5
with:
go-version: '1.21'
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: go mod
run: make mod
- name: golangci-lint
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 0
-
Expand All @@ -26,7 +26,7 @@ jobs:
DEFAULT_BUMP: minor
-
name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
-
name: Run GoReleaser
uses: goreleaser/goreleaser-action@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
container: golang:1.21

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: test
run: make mod test coverage
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/todo_to_issue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
build:
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v3"
- uses: "actions/checkout@v4"
- name: "TODO to Issue"
uses: "alstr/todo-to-issue-action@v4"
env:
Expand Down
52 changes: 30 additions & 22 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, e
}

// Enqueue queues jobs to be executed asynchronously
func (m *MemBackend) Enqueue(_ context.Context, job *jobs.Job) (jobID string, err error) {
func (m *MemBackend) Enqueue(_ context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
options := neoq.JobOptions{}
for _, opt := range jobOptions {
opt(&options)
}

var queueChan chan *jobs.Job
var qc any
var ok bool
Expand Down Expand Up @@ -96,29 +101,34 @@ func (m *MemBackend) Enqueue(_ context.Context, job *jobs.Job) (jobID string, er
job.RunAfter = now
}

if job.Queue == "" {
err = jobs.ErrNoQueueSpecified
return
}

err = jobs.FingerprintJob(job)
if err != nil {
err = errors.Join(jobs.ErrCantGenerateFingerprint, err)
return
}

// if the job fingerprint is already known, don't queue the job
if _, found := m.fingerprints.Load(job.Fingerprint); found {
return jobs.DuplicateJobID, nil
if !options.Override {
return jobs.DuplicateJobID, jobs.ErrJobFingerprintConflict
}
oldJob, found := m.fingerprints.Swap(job.Fingerprint, job)
if found {
// Return the same JobID to make it the same as posgres
job.ID = oldJob.(*jobs.Job).ID
} else {
m.logger.Info("Expected to get job but none was returned for fingerprint %s", job.Fingerprint)
}
jobID = fmt.Sprint(job.ID)
} else {
m.fingerprints.Store(job.Fingerprint, job)
m.mu.Lock()
m.jobCount++
m.mu.Unlock()
job.ID = m.jobCount
jobID = fmt.Sprint(m.jobCount)
}

m.fingerprints.Store(job.Fingerprint, job)
m.mu.Lock()
m.jobCount++
m.mu.Unlock()

job.ID = m.jobCount
jobID = fmt.Sprint(m.jobCount)

if job.RunAfter.Equal(now) {
queueChan <- job
} else {
Expand All @@ -135,6 +145,8 @@ func (m *MemBackend) Start(ctx context.Context, h handler.Handler) (err error) {
queueCapacity = defaultMemQueueCapacity
}

h.RecoverCallback = m.config.RecoveryCallback

m.handlers.Store(h.Queue, h)
m.queues.Store(h.Queue, make(chan *jobs.Job, queueCapacity))

Expand Down Expand Up @@ -173,6 +185,7 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()
h.Queue = queue
h.RecoverCallback = m.config.RecoveryCallback

err = m.Start(ctx, h)
if err != nil {
Expand Down Expand Up @@ -249,7 +262,7 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {

m.logger.Error("job failed", slog.Int64("job_id", job.ID), slog.Any("error", err))

runAfter := internal.CalculateBackoff(job.Retries)
runAfter := internal.CalculateBackoff(job.Retries, job.RunAfter)
job.RunAfter = runAfter
job.Status = internal.JobStatusFailed
m.queueFutureJob(job)
Expand Down Expand Up @@ -318,7 +331,7 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) {
}

func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (err error) {
ctx = withJobContext(ctx, job)
ctx = jobs.WithJobContext(ctx, job)

m.logger.Debug(
"handling job",
Expand Down Expand Up @@ -378,8 +391,3 @@ func (m *MemBackend) removeFutureJob(jobID int64) {
m.futureJobs.Delete(job.ID)
}
}

// withJobContext creates a new context with the Job set
func withJobContext(ctx context.Context, j *jobs.Job) context.Context {
return context.WithValue(ctx, internal.JobCtxVarKey, j)
}
25 changes: 19 additions & 6 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ import (
"time"

"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/backends"
"github.com/acaloiaro/neoq/backends/memory"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/testutils"
"github.com/pkg/errors"
"github.com/robfig/cron"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slog"
)

const (
queue = "testing"
)
var queue = "testing"
var q1 = "queue1"
var q2 = "queue2"

var (
errPeriodicTimeout = errors.New("timed out waiting for periodic job")
Expand Down Expand Up @@ -231,9 +233,6 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
jobsProcessed1 := 0
jobsProcessed2 := 0

q1 := "queue1"
q2 := "queue2"

done1 := make(chan bool)
done2 := make(chan bool)

Expand Down Expand Up @@ -427,3 +426,17 @@ result_loop:
t.Error(err)
}
}

func initQueue() (neoq.Neoq, error) {
ctx := context.Background()
return neoq.New(ctx, neoq.WithBackend(memory.Backend), neoq.WithLogLevel(logging.LogLevelDebug)) //nolint: error
}

func TestSuite(t *testing.T) {
n, err := initQueue()
if err != nil {
t.Fatal(err)
}
s := backends.NewNeoQTestSuite(n)
suite.Run(t, s)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS neoq_jobs_fingerprint_unique_idx;
CREATE UNIQUE INDEX IF NOT EXISTS neoq_jobs_fingerprint_unique_idx ON neoq_jobs (queue, fingerprint, status) WHERE NOT (status = 'processed');
Loading