diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index a16e546..06b0183 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -191,11 +191,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err go p.listenerManager(ctx) // monitor queues for pending jobs, so neoq is resilient to LISTEN disconnects and reconnections - pendingJobsConn, err := p.pool.Acquire(ctx) - if err != nil { - return nil, fmt.Errorf("unable to get a database connection: %w", err) - } - p.processPendingJobs(ctx, pendingJobsConn) + p.processPendingJobs(ctx) p.listenConnDown <- true @@ -547,7 +543,17 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha // SetLogger sets this backend's logger func (p *PgBackend) SetLogger(logger logging.Logger) { + p.mu.Lock() p.logger = logger + p.mu.Unlock() +} + +// Logger gets this backend's logger +func (p *PgBackend) Logger() (l logging.Logger) { + p.mu.Lock() + l = p.logger + p.mu.Unlock() + return } // Shutdown shuts this backend down @@ -831,20 +837,29 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { // // Past due jobs are fetched on the interval [neoq.DefaultPendingJobFetchInterval] // nolint: cyclop -func (p *PgBackend) processPendingJobs(ctx context.Context, conn *pgxpool.Conn) { +func (p *PgBackend) processPendingJobs(ctx context.Context) { go func(ctx context.Context) { - defer conn.Release() + var err error + var conn *pgxpool.Conn + var pendingJobs []*jobs.Job ticker := time.NewTicker(neoq.DefaultPendingJobFetchInterval) // check for pending jobs on an interval until the context is canceled for { - pendingJobs, err := p.getPendingJobs(ctx, conn) + conn, err = p.acquire(ctx) + if err != nil { + p.Logger().Error("[pending_jobs] unable to get database connection", slog.Any("error", err)) + <-ticker.C + continue + } + + pendingJobs, err = p.getPendingJobs(ctx, conn) + conn.Release() if errors.Is(err, context.Canceled) { return } - if err != nil && !errors.Is(err, pgx.ErrNoRows) { - p.logger.Error( + p.Logger().Error( "failed to fetch pending jobs", slog.Any("error", err), ) @@ -1049,7 +1064,7 @@ func (p *PgBackend) acquire(ctx context.Context) (conn *pgxpool.Conn, err error) ctx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(p.config.PGConnectionTimeout)) defer cancelFunc() - p.logger.Debug("acquiring connection with timeout", slog.Any("timeout", p.config.PGConnectionTimeout)) + p.Logger().Debug("acquiring connection with timeout", slog.Any("timeout", p.config.PGConnectionTimeout)) connCh := make(chan *pgxpool.Conn) errCh := make(chan error) @@ -1069,7 +1084,7 @@ func (p *PgBackend) acquire(ctx context.Context) (conn *pgxpool.Conn, err error) case err := <-errCh: return nil, err case <-ctx.Done(): - p.logger.Error("exceeded timeout acquiring a connection from the pool", slog.Any("timeout", p.config.PGConnectionTimeout)) + p.Logger().Error("exceeded timeout acquiring a connection from the pool", slog.Any("timeout", p.config.PGConnectionTimeout)) cancelFunc() err = ErrExceededConnectionPoolTimeout return diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index d2a2ff3..ba811bd 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -1144,7 +1144,7 @@ func TestProcessPendingJobs(t *testing.T) { ctx := context.Background() - // INSERTing jobs into the the job queue before noeq is listening on any queues ensures that the new job is not announced, and when + // INSERTing jobs into the job queue before noeq is listening on any queues ensures that the new job is not announced, and when // neoq _is_ started, that there is a pending jobs waiting to be processed payload := map[string]interface{}{ "message": "hello world", @@ -1154,7 +1154,7 @@ func TestProcessPendingJobs(t *testing.T) { VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`, queue, "dummy", payload, time.Now().UTC(), nil, 1).Scan(&pendingJobID) if err != nil { - err = fmt.Errorf("unable to add job to queue: %w", err) + t.Error(fmt.Errorf("unable to add job to queue: %w", err)) return } @@ -1174,8 +1174,9 @@ func TestProcessPendingJobs(t *testing.T) { t.Error(err) } - var status string go func() { + var err error + var status string // ensure job has failed/has the correct status for { err = conn. @@ -1200,6 +1201,6 @@ func TestProcessPendingJobs(t *testing.T) { case <-done: } if err != nil { - t.Errorf("job should have resulted in a status of 'processed', but its status is %s", status) + t.Errorf("job should have resulted in a status of 'processed', but did not") } }