Skip to content

Commit

Permalink
Merge pull request #2 from pconstantinou/feature/bump-version
Browse files Browse the repository at this point in the history
Feature/bump version
  • Loading branch information
pconstantinou authored Feb 26, 2024
2 parents e7ef0a4 + ad4410e commit 4a13c41
Show file tree
Hide file tree
Showing 8 changed files with 459 additions and 404 deletions.
5 changes: 2 additions & 3 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, e
}

// Enqueue queues jobs to be executed asynchronously
func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
func (m *MemBackend) Enqueue(_ context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
options := neoq.JobOptions{}
for _, opt := range jobOptions {
opt(&options)
Expand Down Expand Up @@ -120,7 +120,6 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...n
m.logger.Info("Expected to get job but none was returned for fingerprint %s", job.Fingerprint)
}
jobID = fmt.Sprint(job.ID)

} else {
m.fingerprints.Store(job.Fingerprint, job)
m.mu.Lock()
Expand Down Expand Up @@ -260,7 +259,7 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {

m.logger.Error("job failed", slog.Int64("job_id", job.ID), slog.Any("error", err))

runAfter := internal.CalculateBackoff(job.Retries)
runAfter := internal.CalculateBackoff(job.Retries, job.RunAfter)
job.RunAfter = runAfter
job.Status = internal.JobStatusFailed
m.queueFutureJob(job)
Expand Down
12 changes: 9 additions & 3 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/acaloiaro/neoq/testutils"
"github.com/pkg/errors"
"github.com/robfig/cron"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slog"
)

Expand Down Expand Up @@ -426,11 +427,16 @@ result_loop:
}
}

func TestSuite(t *testing.T) {
func initQueue() (neoq.Neoq, error) {
ctx := context.Background()
n, err := neoq.New(ctx, neoq.WithBackend(memory.Backend), neoq.WithLogLevel(logging.LogLevelDebug))
return neoq.New(ctx, neoq.WithBackend(memory.Backend), neoq.WithLogLevel(logging.LogLevelDebug)) //nolint: error
}

func TestSuite(t *testing.T) {
n, err := initQueue()
if err != nil {
t.Fatal(err)
}
backends.NewNeoQTestSuite(n).Run(t)
s := backends.NewNeoQTestSuite(n)
suite.Run(t, s)
}
61 changes: 49 additions & 12 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,21 +574,49 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job, opti
fmt.Errorf("%w: %s", jobs.ErrCantGenerateFingerprint, err.Error())
}
p.logger.Debug("adding job to the queue", slog.String("queue", j.Queue))
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
if isUniqueConflict(err) && options.Override {
err = tx.QueryRow(ctx, `UPDATE neoq_jobs set payload=$3, run_after=$4, deadline=$5, max_retries=$6, retries=0
WHERE queue = $1 and fingerprint = $2 and status != "processed"
RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
if err != nil {
p.logger.Error("error enqueueing override job", slog.Any("error", err))
var query string
didUpdate := false
if options.Override {
var rows pgx.Rows
query = `
UPDATE neoq_jobs
SET
payload = $3,
run_after = $4,
deadline = $5,
max_retries = $6,
status = $7
WHERE
queue = $1 AND
fingerprint = $2 AND
status != $8
RETURNING id`
rows, err = tx.Query(ctx, query,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries, internal.JobStatusNew, internal.JobStatusProcessed)
if rows != nil {
defer rows.Close()
if err == nil && rows.Next() {
didUpdate = true
err = rows.Scan(&jobID)
}
}
}
if !didUpdate { // This will always run unless the Override is set and an update was performed
query = `
INSERT INTO neoq_jobs (queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id`
err = tx.QueryRow(ctx, query, j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
}
if err != nil {
p.logger.Error("error enqueueing override job", slog.Any("error", err))
}
if isUniqueConflict(err) && !options.Override {
err = fmt.Errorf("conflicting job: %w", err)
}

if err != nil {
err = fmt.Errorf("unable add job to queue: %w", err)
err = fmt.Errorf("unable add job to queue [%s] with fingerprint [%s]: %w", j.Queue, j.Fingerprint, err)
return
}

Expand Down Expand Up @@ -652,7 +680,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {

var runAfter time.Time
if status == internal.JobStatusFailed {
runAfter = internal.CalculateBackoff(job.Retries)
runAfter = internal.CalculateBackoff(job.Retries, job.RunAfter)
qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4, run_after = $5 WHERE id = $6"
_, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.Retries, runAfter, job.ID)
} else {
Expand Down Expand Up @@ -896,6 +924,15 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
return
}

if job.RunAfter.After(time.Now()) {
p.logger.Error("Running a job too early.",
slog.String("queue", job.Queue),
slog.Int64("job_id", job.ID),
slog.String("duration", (-time.Since(job.RunAfter)).String()))
err = p.updateJob(ctx, fmt.Errorf("run to soon"))
return
}

if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) {
err = jobs.ErrJobExceededDeadline
p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", job.Queue), slog.Int64("job_id", job.ID))
Expand Down
20 changes: 12 additions & 8 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/acaloiaro/neoq/testutils"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/suite"
)

const (
Expand Down Expand Up @@ -776,17 +777,20 @@ func Test_ConnectionTimeout(t *testing.T) {
}
}

func SetupSuite(t *testing.T) (neoq.Neoq, context.Context) {
func initQueue(t *testing.T) (neoq.Neoq, error) {
t.Helper()
connString, _ := prepareAndCleanupDB(t)

ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
nq, err := neoq.New(context.Background(), neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
err = fmt.Errorf("Failed to create queue %w", err)
}
return nq, ctx
return nq, err
}
func TestSuite(t *testing.T) {
n, _ := SetupSuite(t)
backends.NewNeoQTestSuite(n).Run(t)
n, err := initQueue(t)
if err != nil {
t.Fatal(err)
}
s := backends.NewNeoQTestSuite(n)
suite.Run(t, s)
}
4 changes: 3 additions & 1 deletion backends/redis/redis_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/testutils"
"github.com/hibiken/asynq"
"github.com/stretchr/testify/suite"
)

const (
Expand Down Expand Up @@ -462,5 +463,6 @@ func TestSuite(t *testing.T) {
t.Fatal(err)
}

backends.NewNeoQTestSuite(nq).Run(t)
s := backends.NewNeoQTestSuite(nq)
suite.Run(t, s)
}
Loading

0 comments on commit 4a13c41

Please sign in to comment.