Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: queue pr part 1 #808

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open

refactor: queue pr part 1 #808

wants to merge 37 commits into from

Conversation

devhaozi
Copy link
Member

@devhaozi devhaozi commented Jan 5, 2025

📑 Description

Closes goravel/goravel#153

This is the first part of the queue refactoring.

Summary by CodeRabbit

Based on the comprehensive changes across multiple files in the queue package, here are the updated release notes:

Queue System Enhancements

  • New Features

    • Introduced a flexible queue management system with support for multiple drivers (Sync, Async, Machinery).
    • Added robust job registration and execution capabilities.
    • Implemented advanced error handling for job processing.
    • Enhanced configuration management for queue operations.
    • Added new methods for managing job queues, including bulk and delayed processing.
  • Improvements

    • Simplified job argument handling with more generic type support.
    • Improved job repository management.
    • Added support for delayed and chained job execution.
    • Enhanced logging and error tracking for job failures.
    • Streamlined the initialization process for the application and worker components.
  • Breaking Changes

    • Refactored queue configuration and job management interfaces.
    • Removed direct Redis and Docker dependencies in testing.
    • Updated method signatures across queue-related components, including changes to argument types.
    • The Machinery driver has been removed, consolidating queue management.
  • Deprecation Notice

    • Machinery driver will be removed in version 1.17.

✅ Checks

  • Added test cases for my code

@devhaozi devhaozi requested a review from a team as a code owner January 5, 2025 07:54
@devhaozi devhaozi self-assigned this Jan 5, 2025
Copy link
Contributor

coderabbitai bot commented Jan 5, 2025

Walkthrough

The pull request introduces a comprehensive refactoring of the queue system in the Goravel framework. It establishes a new queue management architecture with support for multiple drivers (async, sync, and machinery), enhanced job handling, and improved configuration management. The changes include new interfaces, implementation of different queue drivers, mock implementations for testing, and restructured job and worker management.

Changes

File Change Summary
contracts/queue/config.go New Config interface defining queue configuration methods
contracts/queue/driver.go New Driver interface and constants for queue drivers
contracts/queue/job.go Added JobRepository interface for job management
contracts/queue/queue.go Modified Queue interface with new methods and signatures
contracts/queue/driver_async.go Implemented asynchronous queue driver with channel-based job processing
contracts/queue/driver_sync.go Implemented synchronous queue driver for immediate job execution
contracts/queue/job.go Implemented job management with JobImpl and FailedJob structs
contracts/queue/worker.go Refactored worker implementation with improved job processing and error handling
errors/list.go Added new queue-related error messages
mocks/queue/Config.go Introduced mock implementation for Config interface
mocks/queue/Driver.go Introduced mock implementation for Driver interface
mocks/queue/JobRepository.go Introduced mock implementation for JobRepository interface
mocks/queue/Queue.go Enhanced mock for Queue interface with new methods
mocks/queue/Worker.go Added mock implementation for Worker interface
queue/application.go Updated Application struct to remove logging dependency and streamline job management
queue/service_provider.go Simplified service provider registration logic

Assessment against linked issues

Objective Addressed Explanation
Async and sync job dispatch Implemented async and sync drivers in driver_async.go and driver_sync.go
Support for multiple drivers Added support for async, sync, and machinery drivers
Interfaces for queue status Introduced Config interface with methods like Size() and Driver()

Finishing Touches

  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@devhaozi devhaozi closed this Jan 5, 2025
@devhaozi devhaozi reopened this Jan 5, 2025
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: 91280d5 Previous: 458efb9 Ratio
Benchmark_DecryptString 4813 ns/op 1592 B/op 17 allocs/op 1973 ns/op 1592 B/op 17 allocs/op 2.44
Benchmark_DecryptString - ns/op 4813 ns/op 1973 ns/op 2.44

This comment was automatically generated by workflow using github-action-benchmark.

@devhaozi devhaozi changed the title refactor: queue pr 1 refactor: queue pr part 1 Jan 5, 2025
Copy link

codecov bot commented Jan 5, 2025

Codecov Report

Attention: Patch coverage is 65.11628% with 105 lines in your changes missing coverage. Please review.

Project coverage is 69.41%. Comparing base (634f3b3) to head (9c3155b).

Files with missing lines Patch % Lines
queue/driver_sync.go 28.00% 16 Missing and 2 partials ⚠️
queue/driver_machinery.go 62.22% 17 Missing ⚠️
queue/config.go 52.17% 9 Missing and 2 partials ⚠️
queue/driver.go 42.10% 11 Missing ⚠️
queue/job.go 54.16% 10 Missing and 1 partial ⚠️
mail/application.go 0.00% 9 Missing ⚠️
queue/driver_async.go 82.97% 7 Missing and 1 partial ⚠️
queue/service_provider.go 0.00% 8 Missing ⚠️
queue/worker.go 89.36% 3 Missing and 2 partials ⚠️
queue/application.go 82.60% 4 Missing ⚠️
... and 1 more
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #808      +/-   ##
==========================================
- Coverage   69.57%   69.41%   -0.16%     
==========================================
  Files         215      219       +4     
  Lines       18499    18606     +107     
==========================================
+ Hits        12870    12915      +45     
- Misses       4962     5027      +65     
+ Partials      667      664       -3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@hwbrzzl hwbrzzl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great PR 👍

contracts/queue/driver.go Outdated Show resolved Hide resolved
queue/utils.go Outdated Show resolved Hide resolved
queue/utils_test.go Show resolved Hide resolved
queue/driver.go Outdated
const DriverSync string = "sync"
const DriverASync string = "async"
const DriverMachinery string = "machinery" // TODO: Will be removed in v1.17
const DriverCustom string = "custom"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add the DB driver in the next PR? And can we move these constants to contracts? The same with DB: https://github.com/goravel/framework/blob/master/contracts/database/config.go#L3-L10

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

queue/driver.go Outdated Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some test cases for this file.

queue/driver_machinery.go Outdated Show resolved Hide resolved
queue/config.go Show resolved Hide resolved
queue/driver_machinery.go Outdated Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will a new redis driver be implemented?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (40)
queue/application.go (2)

23-25: Use of NewChainTask clarifies job chaining.
This approach uses the new chain-based logic. Make sure to confirm that chaining does not break any existing concurrency or error-handling patterns.


36-36: Bulk job registration.
Invoking app.job.Register(jobs) centralizes job registration. Consider logging or error handling if registration fails.

queue/task.go (1)

66-77: DispatchSync elegantly handles sequential job execution.
If there's a large number of chained jobs, consider the possibility of partial failures interrupting subsequent tasks. If desired, you could add a rollback mechanism or partial success logging.

queue/worker.go (4)

14-21: New fields in Worker struct.

  • driver is introduced but overshadowed by local usage; consider eliminating or setting it once to avoid redundancy.
  • failedJobChan is a neat concurrency channel for handling job failures asynchronously.
- driver queue.Driver
+ // Potentially remove or populate 'driver' if you want to store it globally
🧰 Tools
🪛 golangci-lint (1.62.2)

17-17: field driver is unused

(unused)


35-59: Run method main loop.

  1. Marking isShutdown = false is straightforward.
  2. Checking driver.Driver() for DriverSync is consistent with the existing error approach.
  3. Spawning multiple goroutines (up to r.concurrent) for queue consumption is typical but consider adding more robust error logging or metrics.

61-70: Failure channel handling.
Using r.failedJobChan to push failed jobs is a solid design. Consider adding a retry mechanism if certain failures are transient.


86-87: Shutdown method toggles isShutdown.
Graceful shutdown is valuable. Potentially wait for in-progress jobs to complete if that’s desired.

contracts/queue/job.go (1)

12-17: New JobRepository interface.
Centralizing job management methods is a significant architectural improvement. Great step toward maintainability.

contracts/queue/config.go (2)

11-11: Plan for the deprecation of Redis method.

The comment suggests this method will be removed in v1.17. Make sure to track usage of this method throughout the codebase and provide a clear migration path before removal.


12-13: Consider error handling for Size and Via.

Returning an int or any without an error leaves callers no direct way to address misconfiguration. You might want to offer a more resilient interface that also reports errors if needed.

contracts/queue/queue.go (3)

4-5: Naming clarity for All.

While All is concise, consider something more descriptive like AllJobs to avoid confusion with future expansions in the interface.


10-11: Method name Job could be more expressive.

Job(job Job, args []any) Task” might be misread. Consider renaming it to something like Enqueue or PushJob to clarify its intent.


14-15: Double-check usage of Worker(payloads ...Args) Worker.

If Args were to expand in functionality, a more direct approach (e.g., a single configuration object) might be clearer. Evaluate if there’s any confusion from multiple potential Args.

queue/driver.go (1)

15-15: Remove deprecated driver code on schedule

There's a TODO comment indicating that DriverMachinery will be removed in v1.17. Make sure to track this in the project backlog, so the deprecated code is removed at the correct version.

queue/driver_sync.go (1)

31-42: Interrupting bulk jobs on first error

If any job fails in the Bulk method, the entire batch halts. If that's intended, it’s fine; otherwise, consider accumulating errors so that other jobs can continue.

queue/job_test.go (4)

11-14: Consider zero-value initialization.
Instead of storing a pointer to JobImpl, you could store the struct value directly. This reduces pointer-chasing overhead and can simplify some test logic if you do not need to mutate the pointer itself.

- jobManager *JobImpl
+ jobManager JobImpl

24-32: Expand coverage for partial registration scenarios.
Currently, the test only registers two jobs and inspects len(registeredJobs). Consider adding a scenario where the array includes nil or duplicate jobs to confirm how the system behaves in edge cases.


43-46: Add negative test for invalid argument types.
This test verifies an unregistered job fails, but you may also want to test a registered job that receives arguments of unexpected types.


48-55: Minimal duplication.
The logic here is quite similar to CallRegisteredJobSuccessfully, so the tests are consistent. If these tests grow in complexity, consider extracting shared logic (e.g., registering a job) into helper methods.

queue/driver_machinery_test.go (2)

1-1: Reminder to remove the TODO.
The file has a TODO comment stating "Will be removed in v1.17". Please ensure that this is tracked and removed before or in v1.17 to avoid stale comments in the codebase.


26-27: Use inlined initialization for test setup if it’s short.
Consider inlining the creation of mocksconfig.Config and mockslog.Log if you want to reduce the number of lines in setups. That’s just a style preference though.

queue/driver_async.go (4)

11-11: Potential for concurrency conflicts.
asyncQueues is a global sync.Map. In high-throughput scenarios, the concurrency overhead could be considerable. Consider a more localized or instance-based approach if you foresee scaling issues.


13-16: Avoid storing non-essential fields.
If connection or size are derivable from a config object, consider removing them from the struct to reduce duplication of state.


38-52: Consider reusing worker pools or goroutines.
Spawning a goroutine inside a loop for each delayed job can be expensive under heavy load. A worker pool approach or shared scheduler might reduce overhead, but that depends on concurrency needs.


54-61: Delayed scheduling approach.
Scheduling using time.Sleep inside a goroutine is straightforward but can cause a flood of idle goroutines if many delayed jobs are queued. Consider a more centralized scheduling approach if large volumes of delayed tasks are expected.

queue/job.go (3)

31-40: Check potential performance overhead of repeatedly appending in All().

Each time r.jobs.Range(...) iterates, appending to jobs can force array resizing if it grows too large. For typical usage, this is probably fine, but keep an eye on performance if the job list grows significantly. You may also consider a pre-allocation strategy if the collection size is known.


42-50: Handling of job invocation errors.

The current logic returns err if the job signature is not found, but if job.Handle(args...) fails, the panic or other error-handling strategy must be clarified. Consider adding structured logging or additional checks to differentiate between “job not found” and “job execution failed” errors.


52-59: Graceful handling of missing job signatures.

When the signature is missing from the sync.Map, errors.QueueJobNotFound.Args(...) is thrown. This approach is valid for a minimal viable product. For clarity, you might add a user-friendly error message that references the missing signature to help in debugging or logs the error details more comprehensively.

queue/driver_machinery.go (2)

1-2: Pending removal notice.

A TODO indicates this file will be removed in v1.17. Consider clarifying in the PR description if the deprecation path is clearly communicated and scheduled.


37-40: Provide an explicit implementation or a graceful fallback for Driver().

Currently, the method panics, which is acceptable for a placeholder. However, if Driver() is invoked in production code, consider returning a default driver or a structured error to avoid abrupt process termination.

queue/config.go (4)

21-23: Debug flag is essential for logging improvements.

Retrieving app.debug is standard; consider logging or returning more structured debug info if needed for diagnosing advanced queue errors.


Line range hint 58-75: Upcoming removal notice for Redis(...).

The docstring states “Will be removed in v1.17.” Confirm if the new recommended approach is documented (e.g., usage of alternative queue config). Communicating the deprecation plan helps ensure a smooth transition.


77-83: Provide context on the rationale behind default size of 100 in Size().

If queue.connections.%s.size is omitted, we fallback to 100. This is fine, but clarify whether 100 is an arbitrary guess or an informed default and consider exposing it as a top-level config param for easy tuning.


85-91: Via() usage can be more self-descriptive.

The method name “Via” is a bit ambiguous. If it stands for “which channel or driver is used,” consider a more descriptive naming (e.g., Transport(), Method()). Otherwise, add explanatory documentation around how “via” is used.

mocks/queue/Worker.go (1)

65-81: Ensure consistent return value provisioning for “Shutdown”.
If your tests never specify a return value, the code will panic. Always specify a return value in the test setup or consider defaulting to nil for better resiliency.

func (_m *Worker) Shutdown() error {
    ret := _m.Called()

-   if len(ret) == 0 {
-       panic("no return value specified for Shutdown")
-   }
+   if len(ret) == 0 {
+       return nil
+   }

    var r0 error
    ...
}
queue/driver_sync_test.go (2)

44-53: Sync dispatch test is well-structured.
Verifies that the job increments testSyncJob. Ensure edge cases (e.g., invalid job args) are also handled.


55-76: Chain dispatch logic.
The test chain verifies multiple jobs in sequence. The usage of a short sleep to allow processing is acceptable but can introduce timing flakiness if run on slower systems. Consider a more robust approach (e.g., a wait group or callback) to confirm job completion.

// Example approach using channels or wait groups (pseudo-code):
- time.Sleep(2 * time.Second)
+ waitGroup.Wait() // or channel signal
mocks/queue/JobRepository.go (1)

70-86: Consider factoring out repeated logic for a more readable test.

Multiple mock methods use the same pattern of retrieving and validating the returned error. You could introduce utility methods to reduce code duplication and improve clarity.

queue/driver_async_test.go (2)

56-78: Validate concurrency readiness.

Running jobs and listening in a goroutine is good for async simulation. However, to avoid timing-based flakiness, consider an approach that waits for job completion events rather than sleeping.


185-199: Chaining jobs is well-implemented.

Chained job tests verify sequential execution. Consider adding a test scenario where a chained job fails to ensure that the chain halts appropriately.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 273efc8 and f097ecc.

📒 Files selected for processing (34)
  • contracts/queue/config.go (1 hunks)
  • contracts/queue/driver.go (1 hunks)
  • contracts/queue/job.go (1 hunks)
  • contracts/queue/queue.go (1 hunks)
  • errors/list.go (1 hunks)
  • event/task.go (1 hunks)
  • event/task_test.go (2 hunks)
  • mail/application.go (1 hunks)
  • mail/application_test.go (7 hunks)
  • mocks/queue/Config.go (1 hunks)
  • mocks/queue/Driver.go (1 hunks)
  • mocks/queue/JobRepository.go (1 hunks)
  • mocks/queue/Queue.go (7 hunks)
  • mocks/queue/Worker.go (1 hunks)
  • queue/application.go (1 hunks)
  • queue/application_test.go (0 hunks)
  • queue/config.go (3 hunks)
  • queue/config_test.go (1 hunks)
  • queue/driver.go (1 hunks)
  • queue/driver_async.go (1 hunks)
  • queue/driver_async_test.go (1 hunks)
  • queue/driver_machinery.go (1 hunks)
  • queue/driver_machinery_log.go (1 hunks)
  • queue/driver_machinery_test.go (3 hunks)
  • queue/driver_sync.go (1 hunks)
  • queue/driver_sync_test.go (1 hunks)
  • queue/job.go (1 hunks)
  • queue/job_test.go (1 hunks)
  • queue/machinery.go (0 hunks)
  • queue/service_provider.go (2 hunks)
  • queue/task.go (1 hunks)
  • queue/task_test.go (1 hunks)
  • queue/utils_test.go (2 hunks)
  • queue/worker.go (1 hunks)
💤 Files with no reviewable changes (2)
  • queue/machinery.go
  • queue/application_test.go
✅ Files skipped from review due to trivial changes (1)
  • queue/driver_machinery_log.go
🧰 Additional context used
🪛 golangci-lint (1.62.2)
queue/worker.go

17-17: field driver is unused

(unused)

🔇 Additional comments (116)
queue/application.go (6)

9-10: Use of interfaces to decouple job management is commendable.
The switch from a slice of jobs to a JobRepository interface neatly centralizes job management and enhances scalability.


20-22: Method All() fosters consistent job retrieval.
Returning app.job.All() ensures the retrieval of all registered jobs from the repository. Nice streamlined approach.


31-32: Updated Job method aligns with new argument signature.
Switching to args []any helps pass more flexible argument types to queue tasks.


39-52: Refined Worker method logic.
The dynamic fallback to defaultConnection and concurrency defaults is clean. Be sure to handle cases where payloads[0] might be nil (though the code checks for len(payloads) == 0 first, so it seems safe).


13-16: Constructor uses a new JobRepository implementation.
The constructor now cleanly initializes job via NewJobImpl(). Verify that no other parts of the code (in or outside this file) require the old approach.

✅ Verification successful

The switch to NewJobImpl() is safe and consistent with the codebase
The codebase shows consistent usage of NewJobImpl() for job management. Array/slice operations found in the code serve different purposes (task chaining, bulk operations) and don't conflict with this implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify if any references exist to the old approach of handling jobs directly
rg -A 5 "old approach or any direct usage of job slices"

Length of output: 58


Script:

#!/bin/bash
# Search for job-related type definitions and implementations
ast-grep --pattern 'type $_ struct {
  $$$
  jobs $$$
  $$$
}'

# Look for direct slice operations and old patterns
rg -g '*.go' '(\[\].*job|jobs\s*\[|append.*jobs|len\(jobs|range.*jobs)' -i

# Find all references to job-related methods
rg -g '*.go' '(NewJob|GetJob|AddJob|RemoveJob|ProcessJob)' -i

Length of output: 9522


27-29: Convenient GetJob accessor.
Exposing direct access to app.job.Get(signature) is straightforward. Validate correctness of usage in other calls.

✅ Verification successful

Accessor implementation and usage are correct.
The GetJob method properly encapsulates access to the underlying job storage, while internal implementation details correctly use direct access within the same package.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Searching usage of GetJob for correctness
rg -A 5 "GetJob"

Length of output: 2764


Script:

#!/bin/bash
# Search for usages of the underlying Get method and test files using GetJob
echo "=== Searching for Get method usage ==="
rg "\.Get\(signature\)" -A 3

echo -e "\n=== Searching for GetJob in test files ==="
fd -e test.go -x rg "GetJob" -A 3 {}

Length of output: 561


Script:

#!/bin/bash
# Get more context around the Get usage in job.go
echo "=== Context from job.go ==="
rg -B 5 -A 10 "\.Get\(signature\)" queue/job.go

Length of output: 500

queue/task.go (6)

10-13: Refactored fields in Task struct enhance clarity.
Storing the config interface and using time.Time for delay is more direct than pointer-based types.


18-28: NewTask constructor with flexible args.
Initializing jobs inside this constructor is a neat approach. No immediate issues found.


32-38: Chain constructor parallels single-task construction.
Both constructors keep consistent patterns for config, connection, and queue. Well-maintained.


42-46: Intuitive Delay method.
Directly storing a time.Time object for future scheduling is straightforward. Remember to confirm that the queue driver supports delayed execution.


81-90: Connection & queue overrides via OnConnection and OnQueue.
Both are logical and easy to read. The pattern aligns with the rest of the refactoring.


48-63: Dispatch logic gracefully handles chain vs. single job.
Backing off if r.chain is true is correct. However, ensure that referencing r.jobs[0] is safe if jobs might be empty under any condition.

✅ Verification successful

Access to r.jobs[0] is safe by construction
The code is safe because r.jobs[0] is only accessed when chain=false, which means the Task was created via NewTask() constructor that always initializes the jobs array with exactly one job.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there's any scenario where r.jobs might be empty
rg "NewTask|NewChainTask" -A 5

Length of output: 3523

queue/worker.go (5)

4-5: Imports for time handling.
Importing "time" is consistent with the new scheduling logic. No concerns.


6-6: Addition of github.com/google/uuid.
Usage of UUIDs for fail-tracking is a good choice. Continue ensuring minimal overhead.


8-10: Relevant queue, errors, and carbon imports.
All new import references appear consistent with current usage.


24-31: NewWorker constructor.
Creating a fail channel for each worker is suitable but watch for resource management if a large number of workers is spawned.


75-84: Failed jobs consumer goroutine.
This robust approach ensures that failures don’t block main job consumption. Just ensure LogFacade is defined and safe for concurrency usage.
[approve]

mocks/queue/Queue.go (5)

23-41: New All() mock method.
Panicking when no return value is specified is typical for mock usage, though consider a default or fallback if that better suits your tests.


43-69: Queue_All_Call struct and helpers.
These additions demonstrate a consistent pattern for mocking. Fine for standard mock usage.


118-145: GetJob mock method.
Allows returning both a queue.Job and an error. Ensure test coverage includes both success and failure scenarios.


177-185: Job mock function with flexible []interface{} parameter.
Switching to a broader type is consistent with the rest of the refactor. No immediate issues.


Line range hint 258-293: Enhanced Worker mock.
Adapting to variadic payloads ...queue.Args matches the real code changes in the queue interface. Looks good.

contracts/queue/job.go (2)

3-4: Time import for delay usage.
Importing "time" is aligned with the new Delay field in Jobs.


20-22: Extended Jobs struct with flexible Args and Delay.
Allowing []any for Args broadens usage scenarios. The Delay time.Duration is straightforward for scheduling.

contracts/queue/config.go (1)

1-4: Nice introduction of the Config interface!

Declaring a separate interface for managing queue configurations is a solid choice, as it makes the design cleaner and easier to extend.

contracts/queue/queue.go (2)

8-9: Ensure valid job retrieval.

GetJob(signature string) (Job, error) is a clean, minimal interface. Just confirm you handle an empty or invalid signature in the implementation to avoid nil returns or panics.


20-20: Great addition of Shutdown.

Providing a graceful shutdown is important for controlled job processing. Be sure to handle outstanding tasks to avoid data inconsistency or partial re-queues.

contracts/queue/driver.go (2)

5-8: Use descriptive constants or preserve a migration plan.

DriverMachinery is marked for removal in v1.17—like with Redis, ensure there’s a clear transition strategy for users before dropping support.


10-23: Interface ordering is fine; keep it consistent within the driver suite.

(Some previously suggested sorting is optional. )

queue/task_test.go (1)

30-31: Excellent demonstration of simplified argument usage.

Switching from custom arg structs to []any provides flexibility, though it relaxes type safety. Ensure you have robust internal checks if these arguments can vary in type.

queue/driver.go (3)

1-2: File Header Uniformity

All looks good here. The package declaration and imports are consistent with the project's style.


8-28: Validate initialization order & concurrency

The NewDriver function relies on external resources like LogFacade (in the case of the Machinery driver). Make sure that LogFacade is already initialized within the Boot phase of the service provider before any code path uses DriverMachinery.


21-24: Well-structured error handling

The fallback for custom drivers is well-implemented. Good job returning a clear error for invalid driver types.

queue/driver_sync.go (4)

1-2: Overall Implementation Consistency

The synchronous driver structure and constructor are clear and consistent with the rest of the codebase.


27-29: Immediate job execution is appropriate for sync driver

The Push method immediately calls job.Handle(args...). This is correct for a synchronous driver.


44-47: Potential blocking on large Delay intervals

The time.Sleep(time.Until(delay)) call in Later can block the caller’s goroutine for a potentially long duration. This is acceptable for a sync driver but confirm that it meets your production requirements.


49-52: Pop operation not supported

Good approach returning nil, nil, nil for a feature the sync driver doesn’t support. Just ensure any calling code handles the nil returns without error.

queue/service_provider.go (3)

14-17: Potentially unused global facades

Check if OrmFacade is needed. If it remains unused, consider removing it to keep the code clean.


34-38: Ensure initialization order matches usage

LogFacade = app.MakeLog() and OrmFacade = app.MakeOrm() are in Boot. If any queue driver references them at Register time, you might experience a nil reference. Verify that none of the queue drivers attempt to use these facades before Boot.


40-42: Encapsulated command registration

Creating a dedicated registerCommands method helps keep the boot logic clean. No issues spotted.

event/task.go (1)

64-67: Loss of structured argument typing

Switching from a custom struct to []any in eventArgsToQueueArgs provides flexibility but removes compile-time type checks. Confirm this trade-off is intentional across the codebase.

queue/job_test.go (4)

1-1: Good test package and suite naming.
The package queue and the JobTestSuite struct accurately reflect the tested functionality.


20-22: Test isolation confirmation.
Creating a new JobImpl in SetupTest helps ensure each test has a fresh state, which is good practice.


34-41: Confirm argument usage in job execution.
While you verify that Call can handle arguments, there's no test to confirm that arguments passed into Call are properly used by the job (Handle). Consider adding an assertion on the job’s internal state to ensure arguments are correct.


62-74: Well-structured mock job prioritizes clarity.
Using a MockJob that sets a called boolean is straightforward. This is a suitable pattern for these unit tests.

queue/driver_machinery_test.go (2)

10-11: Mock package rename is consistent.
Renaming from configmock/logmock to mocksconfig/mockslog clarifies the usage of these mocks. Ensure meeting internal naming conventions across all mocks in the project.

Also applies to: 16-17


58-60: Confirm the presence of concurrency or errors.
You replaced direct server initialization with a function call server := s.machinery.server(...). It’s good to confirm if concurrency or other drivers might raise unexpected errors. Ensure they’re covered by integration or upstream tests.

event/task_test.go (1)

34-34: Argument handling improvements.
Replacing []queuecontract.Arg with []any is consistent with the new flexible approach to argument handling. However, consider verifying that the interface usage doesn’t break type-checking logic later in the call chain.

Also applies to: 51-51

queue/driver_async.go (4)

1-1: Package name matches usage.
Naming the package queue is consistent with the async driver’s functionality.


33-36: Push is non-blocking.
Push sends to a buffered channel. If the channel becomes full, the send will block. Confirm that the buffer size is sufficient or that you have a safe strategy for handling bursts.


63-75: Potential race conditions on channel read.
If the queue is closed or replaced concurrently in the future, reading from the channel might panic. Currently, your approach is safe for the tested usage, but be mindful of potential changes in usage patterns.


78-86: Ensure correct channel capacity usage.
LoadOrStore might retrieve an existing channel with a different capacity. If you want dynamic sizing to persist, confirm that a second call to getQueue with a larger size does not overwrite the previously stored channel.

queue/job.go (3)

13-21: Consider adding a corresponding database migration for FailedJob.

This struct is annotated with GORM tags but there's no mention of a corresponding migration file to create the table for failed jobs in the database. If a migration is missing, please add one.


23-29: Validate potential concurrency implications of sync.Map.

The jobs sync.Map is a straightforward approach to concurrency-safe storage. Ensure that:

  1. The job registry won't encounter race conditions in high-concurrency scenarios (frequent reads and writes).
  2. The type-assertions from any to contractsqueue.Job in subsequent methods are safe.

61-66: Confirm no collision on multiple concurrent job registrations.

As Register() loops over the provided jobs and calls r.jobs.Store, it should be OK under concurrent usage. However, if there can be repeated signatures in the jobs slice, carefully consider how overwriting might affect job definitions.

queue/config_test.go (2)

8-9: Nice usage of the new mock naming convention.

Switching to mocksconfig and injecting via mocksconfig.NewConfig(s.T()) for a more flexible test setup is a clean approach. This ensures consistent naming and usage patterns compared to the old configmock.

Also applies to: 14-15


23-23: Validate adequate coverage of queue config test cases.

The test covers the default connection with “redis.” If additional queue types or connections exist (e.g., “sync,” “sqs”), ensure that they are also tested here or in subsequent test methods to confirm broad coverage.

queue/driver_machinery.go (2)

25-31: Configuration injection approach is consistent.

The constructor NewMachinery aligns well with dependency injection for log and config. This is good for maintainability and testability.


62-85: Validate Redis server settings in server(...).

The code constructs a Machinery server with Redis broker/ backend configurations. Ensure:

  1. Proper credentials management if not using password or if partial credentials are present.
  2. A plan to handle ephemeral test environments where Redis might not be available.
queue/config.go (1)

37-41: Check error handling for FailedJobsQuery.

The method always returns a valid orm.Query, but if the connection or table is missing, queries may fail at runtime. Consider verifying the presence of these config values or returning an error if they're invalid.

queue/utils_test.go (4)

9-9: No issues found with the import alias update.
Changing the import alias to contractsqueue improves clarity.


46-46: Good use of the new import alias.
No functional or logical issues are found.


52-52: Proper alias usage.
Continues the same refactoring pattern. No concerns.


59-59: Refactored type references look good.
No regressions introduced.

mocks/queue/Worker.go (1)

83-108: Mock call struct for “Shutdown” adequately follows the pattern.
The approach is consistent with the existing Run call struct. Make sure all tests set expectations and return values correctly.

queue/driver_sync_test.go (5)

19-24: Applicative naming.
Using a clearly named struct (DriverSyncTestSuite) clarifies intent in these tests.


26-37: Registering jobs in Setup looks clean.
All relevant job types are registered before running tests, promoting clarity in the test flow.


39-42: Reset the counters for each test.
Good practice to ensure test isolation.


78-91: TestSyncJob’s signature and handler.
Implementation looks correct. The job counters are incremented as expected.


93-106: TestChainSyncJob’s signature and handler.
No specific issues identified. Code is straightforward and meets its intended functionality.

mail/application.go (1)

73-81: 🛠️ Refactor suggestion

Transition from typed arguments to []any.
The reduced structure eliminates explicit type annotation but may reduce type safety. Ensure downstream usage of these arguments handles type casting gracefully.

-job := r.queue.Job(NewSendMailJob(r.config), []any{
+// Optionally, reintroduce typed checks if clarity or safety are important:
 job := r.queue.Job(NewSendMailJob(r.config), []any{
     ...
 })

Likely invalid or redundant comment.

mocks/queue/JobRepository.go (2)

1-1: Auto-generated code by mockery.

As this is generated code, manual modifications may be overwritten. Any necessary changes should be made in the interface or generated anew.


27-29: Panic on missing return value.

If a test forgets to specify a return value for All(), the code will panic. While this is standard for testify mocks, consider using a default or fail-fast approach for a clearer error message.

mail/application_test.go (3)

36-36: Confirm that 465 is a valid test scenario.

Using port 465 typically implies TLS for sending mail. Ensure that the environment or test credentials support this secured channel.


79-81: Recommend verifying queue setup.

After switching to the newly refactored async queue, ensure that queue.NewApplication(mockConfig) references the correct driver. The test appears to rely on redis in some scenarios. Confirm that all environment variables for Redis are set if needed.


165-172: Good approach to return a unified mockConfig.

Centralizing the environment and config gathering in one function ensures consistency across test cases. Keep in mind that any advanced re-configuration during test runtime might require re-invoking this function or clearing existing mocks.

errors/list.go (1)

107-115: New queue-related errors align well with the refactoring.

These errors cover missing jobs, invalid drivers, etc., providing more granular handling for queue operations.

queue/driver_async_test.go (1)

31-42: Sensible test suite setup.

Initializing the application once and running the suite helps keep the tests cohesive. Ensure that each test re-initializes state as needed in SetupTest.

mocks/queue/Driver.go (17)

1-2: Auto-generated code notice.
This file was generated by mockery; typically, changes to auto-generated code aren't manually edited. Ensure that this file is either kept in sync with its source or excluded from version control if it causes noise in diffs.


12-15: Mock struct for Driver.
The struct correctly embeds mock.Mock to simulate the Driver interface in tests.


17-19: Expecter struct.
This small helper struct for setting up expectations is consistent with typical mock usage.


21-23: EXPECT() usage.
Returns the Driver_Expecter for handling chained mock calls. Straightforward approach.


25-41: Bulk method.
Properly checks the call arguments and uses default panic if no return value is set. This aligns with standard testify/mock patterns.


43-70: Driver_Bulk_Call.
Defines chained methods to set up behaviors for Bulk. Looks consistent with testify/mock chaining.


72-88: Connection method.
Retrieves a string from the mock call. Follows the same pattern as the Bulk method.


90-116: Driver_Connection_Call.
Helper struct for mocking out the Connection() behavior, consistent with the established pattern.


117-133: Driver method.
Another simple mock method returning a string. No issues identified.


135-160: Driver_Driver_Call.
Helper struct for setting test expectations on Driver() calls.


162-178: Later method.
Uses the standard approach to handle the function signature with multiple parameters. Looks correct.


180-210: Driver_Later_Call.
Maintains the same pattern for chained calls with Run, Return, and RunAndReturn.


211-248: Pop method.
Mimics the multi-valued return functionality used in typical queue pop operations. Implementation is consistent.


250-276: Driver_Pop_Call.
Helper struct for mocking Pop. Nothing unusual here.


278-294: Push method.
Handles the push operation with standard testify/mock usage.


296-324: Driver_Push_Call.
A typical call struct for the Push method. No issues detected.


326-338: NewDriver factory method.
Registers the mock with the testing.T and sets up a cleanup function. This ensures mock expectations are properly asserted.

mocks/queue/Config.go (21)

1-2: Auto-generated code notice.
Similar to Driver.go, this file is mockery-generated. Consider excluding this from manual edits.


10-13: Mock struct for Config.
Correct structure for embedding mock.Mock and simulating Config in tests.


15-17: Config_Expecter.
Helper struct, standard pattern for chaining mock calls.


19-21: EXPECT() usage.
Returns the expecter for chaining. No concerns here.


23-39: Debug method.
Correctly panics if no return is defined. This is standard for testify mocks.


41-67: Config_Debug_Call.
Helper for Debug() method. Implementation is consistent with other mocks.


68-84: DefaultConnection method.
Retrieves or panics if no return set. Typical mock approach.


86-112: Config_DefaultConnection_Call.
Helper for the DefaultConnection() call. Implementation is consistent.


113-129: Driver method.
Mocks retrieving a driver string by connection. No issues apparent.


131-158: Config_Driver_Call.
Handle for the Driver() call. Follows the same pattern.


159-177: FailedJobsQuery method.
Returns an orm.Query or panics otherwise. Mock usage is correct.


179-205: Config_FailedJobsQuery_Call.
Helper struct for setting expectations on FailedJobsQuery.


206-222: Queue method.
Mocks returning a queue name for a given connection. No issue identified.


224-251: Config_Queue_Call.
Provides chainable setup for Queue(). Implementation is fine.


253-286: Redis method.
Supports returning triple values: (dsn, database, alias). The multi-return approach is correct.


288-314: Config_Redis_Call.
Handles the chainable mock calls for Redis.


316-332: Size method.
Returns an integer for the specified connection or panics if unset.


334-360: Config_Size_Call.
Defines the chainable API for configuring the Size() mock.


362-380: Via method.
Handles a generic interface return. No logic issues apparent.


382-408: Config_Via_Call.
Same pattern for the Via() method. Straightforward chainable approach.


410-422: NewConfig factory method.
Registers the mock with the testing environment. Good practice to ensure verification.

queue/task_test.go Show resolved Hide resolved
Comment on lines 42 to 45
func (m *Machinery) Push(job queue.Job, args []any, queue string) error {
//TODO implement me
panic("implement me")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace panic("implement me") with partial or stub implementations.

Using panic() is acceptable for a short-term placeholder, but it can cause unpredictable application behavior if inadvertently triggered. Consider:

  1. Returning a structured error indicating unimplemented functionality.
  2. Providing a minimal stub to handle the logic gracefully.

Also applies to: 47-50, 52-55, 57-60

queue/config.go Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (7)
queue/driver_async_test.go (1)

210-214: Add input validation and logging to job handlers.

The job handlers should validate their input arguments and include logging for better debugging:

 func (receiver *TestAsyncJob) Handle(args ...any) error {
+	if len(args) < 2 {
+		return fmt.Errorf("expected at least 2 arguments, got %d", len(args))
+	}
 	testAsyncJob++
+	log.Printf("TestAsyncJob executed with args: %v", args)
 	return nil
 }

Apply similar changes to all job handlers.

Also applies to: 225-229, 240-244, 255-259, 270-274

🧰 Tools
🪛 GitHub Actions: Codecov

[error] 175: Mock expectation failure: Unexpected GetString method call with parameter 'queue.default'

queue/worker_test.go (3)

46-49: Avoid relying on fixed sleep durations in tests.
Using time.Sleep to wait for asynchronous operations can lead to flaky tests if the system is slower or faster than expected. Instead, consider using synchronization mechanisms to confirm that the worker has picked up and processed the job, such as channels or wait groups.


52-69: Validate database changes for the failed job scenario.
The test verifies that testJob.called is set, but it doesn't assert whether the failed job record is actually created in the database. Consider verifying that mockQuery.Create(mock.Anything) is invoked with the correct payload or a partial match using your mocking framework, ensuring the code for storing failed jobs works as expected.


88-101: Enhance error coverage in test jobs.
The MockFailedJob.Handle method always returns the same error, which is fine for a basic test. However, to more thoroughly test error handling, consider setting up multiple failing conditions or variations of the error to ensure the worker handles different error contexts consistently.

queue/worker.go (3)

52-60: Handle driver timeout or no-jobs scenario explicitly.
The code sleeps only when driver.Pop returns an error. If driver.Pop times out or indicates “no jobs available,” consider clarifying how the worker should respond. For example, a separate error type for “no jobs,” or using a blocked pop mechanism, could make the logic more explicit.

🧰 Tools
🪛 GitHub Actions: Codecov

[error] 78: Panic in goroutine after TestWorkerTestSuite/TestRun_FailedJob completion


79-79: Add more context to error logs.
The error message QueueFailedToSaveFailedJob is helpful, but consider adding details like the job signature or arguments to make debugging easier in production logs.

🧰 Tools
🪛 GitHub Actions: Codecov

[error] 78: Panic in goroutine after TestWorkerTestSuite/TestRun_FailedJob completion


Line range hint 87-96: Reporting the outcome of Shutdown.
Shutdown always returns nil. If there’s any cleanup logic (e.g., driver teardown, flushing buffers) that can fail, returning an error would inform callers of potential issues. If nothing can fail, you can safely keep returning nil.

🧰 Tools
🪛 GitHub Actions: Codecov

[error] 78: Panic in goroutine after TestWorkerTestSuite/TestRun_FailedJob completion

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f097ecc and a5fdf4c.

📒 Files selected for processing (5)
  • queue/driver_async_test.go (1 hunks)
  • queue/driver_sync_test.go (1 hunks)
  • queue/job.go (1 hunks)
  • queue/worker.go (1 hunks)
  • queue/worker_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • queue/job.go
  • queue/driver_sync_test.go
🧰 Additional context used
🪛 GitHub Actions: Codecov
queue/worker.go

[error] 78: Panic in goroutine after TestWorkerTestSuite/TestRun_FailedJob completion

queue/driver_async_test.go

[error] 175: Mock expectation failure: Unexpected GetString method call with parameter 'queue.default'


[error] 164-170: Mock expectations not met: 7 expected method calls were not made in TestChainAsyncQueue

⏰ Context from checks skipped due to timeout of 300000ms (2)
  • GitHub Check: test / windows (1.23)
  • GitHub Check: test / windows (1.22)
🔇 Additional comments (5)
queue/driver_async_test.go (2)

11-13: Organize imports according to past review feedback.

Reorder the mock imports to follow the suggested pattern:

-mocksconfig "github.com/goravel/framework/mocks/config"
-mocksorm "github.com/goravel/framework/mocks/database/orm"
-mocksqueue "github.com/goravel/framework/mocks/queue"
+	mocksconfig "github.com/goravel/framework/mocks/config"
+	mocksorm "github.com/goravel/framework/mocks/database/orm"
+	mocksqueue "github.com/goravel/framework/mocks/queue"
🧰 Tools
🪛 GitHub Actions: Codecov

[error] 175: Mock expectation failure: Unexpected GetString method call with parameter 'queue.default'


31-42: Use New methods for mock initialization per past review feedback.*

-	mockConfig := mocksconfig.NewConfig(t)
-	mockQueue := mocksqueue.NewQueue(t)
+	mockConfig := configmock.NewConfig(t)
+	mockQueue := queuemock.NewQueue(t)
🧰 Tools
🪛 GitHub Actions: Codecov

[error] 175: Mock expectation failure: Unexpected GetString method call with parameter 'queue.default'

queue/worker_test.go (2)

40-44: Consider ensuring job dispatch completion before shutting down the worker.
Currently, the test spawns the worker in a goroutine and dispatches a job after a short sleep, but there's a slight risk that the test may proceed to cancel the context or end before the job truly completes, especially in environments with varying performance. You could employ ready channels, condition variables, or custom hooks to reliably detect job completion.


71-71: Potential data race risk with OrmFacade assignment.
Each test sets OrmFacade = mockOrm, which could cause concurrency issues if other tests or code paths also manipulate OrmFacade. Wrap assignments and usage in protective logic if multiple tests run concurrently or if OrmFacade is used outside the test’s scope.

queue/worker.go (1)

43-44: Double-check for concurrency safety on the isShutdown field.
Reading and writing r.isShutdown from multiple goroutines may present a data race. In practice, it's often safe if there's a single goroutine that performs writes, but strictly speaking, concurrent reads/writes require atomic operations or locking in Go.

🧰 Tools
🪛 GitHub Actions: Codecov

[error] 78: Panic in goroutine after TestWorkerTestSuite/TestRun_FailedJob completion

queue/driver_async_test.go Show resolved Hide resolved
queue/driver_async_test.go Outdated Show resolved Hide resolved
queue/driver_async_test.go Show resolved Hide resolved
queue/driver_async_test.go Outdated Show resolved Hide resolved
queue/worker.go Outdated Show resolved Hide resolved
queue/worker.go Show resolved Hide resolved
contracts/queue/queue.go Outdated Show resolved Hide resolved
queue/config.go Outdated Show resolved Hide resolved
queue/driver_async_test.go Outdated Show resolved Hide resolved
queue/driver_async_test.go Show resolved Hide resolved
queue/driver_async_test.go Outdated Show resolved Hide resolved
queue/worker.go Outdated Show resolved Hide resolved
queue/worker_test.go Outdated Show resolved Hide resolved
queue/worker_test.go Outdated Show resolved Hide resolved
queue/driver.go Outdated
const DriverSync string = "sync"
const DriverASync string = "async"
const DriverMachinery string = "machinery" // TODO: Will be removed in v1.17
const DriverCustom string = "custom"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

"github.com/goravel/framework/support/carbon"
)

type FailedJob struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (3)
queue/config.go (1)

29-35: ⚠️ Potential issue

Add validation for empty driver configuration.

The Driver method should handle the case when the driver configuration is empty or invalid.

Apply this diff to add validation:

func (r *Config) Driver(connection string) string {
	if connection == "" {
		connection = r.DefaultConnection()
	}

+	driver := r.config.GetString(fmt.Sprintf("queue.connections.%s.driver", connection))
+	if driver == "" {
+		return "sync" // Default to sync driver if not configured
+	}
-	return r.config.GetString(fmt.Sprintf("queue.connections.%s.driver", connection))
+	return driver
}
queue/driver_async_test.go (2)

15-21: 🛠️ Refactor suggestion

Move test counters into the test suite struct.

Using global variables for test state can lead to test pollution if tests run in parallel.

Apply this diff:

-var (
-	testAsyncJob       = 0
-	testDelayAsyncJob  = 0
-	testCustomAsyncJob = 0
-	testErrorAsyncJob  = 0
-	testChainAsyncJob  = 0
-)

type DriverAsyncTestSuite struct {
	suite.Suite
	app        *Application
	mockConfig *mocksconfig.Config
	mockQueue  *mocksqueue.Queue
+	counters   struct {
+		asyncJob       int
+		delayAsyncJob  int
+		customAsyncJob int
+		errorAsyncJob  int
+		chainAsyncJob  int
+	}
}

34-40: 🛠️ Refactor suggestion

Reset all counters in SetupTest.

SetupTest should reset all test counters to ensure a clean state for each test.

Apply this diff:

func (s *DriverAsyncTestSuite) SetupTest() {
-	testAsyncJob = 0
+	// Reset all counters
+	s.counters.asyncJob = 0
+	s.counters.delayAsyncJob = 0
+	s.counters.customAsyncJob = 0
+	s.counters.errorAsyncJob = 0
+	s.counters.chainAsyncJob = 0
	s.mockQueue = mocksqueue.NewQueue(s.T())
	s.mockConfig = mocksconfig.NewConfig(s.T())
	s.app = NewApplication(s.mockConfig)
	s.app.Register([]queue.Job{&TestAsyncJob{}, &TestDelayAsyncJob{}, &TestCustomAsyncJob{}, &TestErrorAsyncJob{}, &TestChainAsyncJob{}})
}
🧹 Nitpick comments (5)
contracts/queue/queue.go (2)

4-5: Complete the documentation for Chain method.

The comment for Chain method is incomplete. Please provide a clear description of what happens to the results between chained jobs.

-	// Chain creates a chain of jobs to be processed one by one, passing
+	// Chain creates a chain of jobs to be processed sequentially. The result of each job
+	// is passed as the first argument to the next job in the chain.

15-15: Add documentation for Worker interface methods.

The Worker interface methods lack documentation explaining their purpose and behavior, particularly the new Shutdown method.

 type Worker interface {
+	// Run starts the worker and begins processing jobs from the queue.
+	// Returns an error if the worker fails to start.
 	Run() error
+	// Shutdown gracefully stops the worker after completing any in-progress jobs.
+	// Returns an error if the shutdown process fails.
 	Shutdown() error
 }

Also applies to: 20-20

queue/application.go (1)

40-53: Simplify Worker method implementation.

The method contains duplicate calls to config.Queue and could be simplified by consolidating the default value logic.

 func (app *Application) Worker(payloads ...queue.Args) queue.Worker {
 	defaultConnection := app.config.DefaultConnection()
+	connection := defaultConnection
+	concurrent := 1
+	queueName := ""
 
-	if len(payloads) == 0 {
-		return NewWorker(app.config, 1, defaultConnection, app.config.Queue(defaultConnection, ""), app.job)
-	}
-	if payloads[0].Connection == "" {
-		payloads[0].Connection = defaultConnection
-	}
-	if payloads[0].Concurrent == 0 {
-		payloads[0].Concurrent = 1
+	if len(payloads) > 0 {
+		if payloads[0].Connection != "" {
+			connection = payloads[0].Connection
+		}
+		if payloads[0].Concurrent > 0 {
+			concurrent = payloads[0].Concurrent
+		}
+		queueName = payloads[0].Queue
 	}
 
-	return NewWorker(app.config, payloads[0].Concurrent, payloads[0].Connection, app.config.Queue(payloads[0].Connection, payloads[0].Queue), app.job)
+	return NewWorker(app.config, concurrent, connection, app.config.Queue(connection, queueName), app.job)
 }
queue/worker.go (1)

33-33: Consider making the failed job channel buffer size configurable.

The channel buffer size is currently tied to the number of concurrent workers, which might not be optimal for all scenarios.

-		failedJobChan: make(chan FailedJob, concurrent),
+		failedJobChan: make(chan FailedJob, config.GetInt("queue.failed_jobs_buffer_size", concurrent)),
queue/config.go (1)

Line range hint 58-76: Consider removing deprecated Redis method.

The TODO comment indicates this method will be removed in v1.17. Consider adding a deprecation notice in the method documentation.

Add a deprecation notice:

// Redis returns the Redis configuration for a given connection.
-// TODO: Will be removed in v1.17
+// Deprecated: This method will be removed in v1.17. Use queue.Config interface methods instead.
func (r *Config) Redis(queueConnection string) (dsn string, database int, queue string) {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a5fdf4c and 28a9e1d.

📒 Files selected for processing (8)
  • contracts/queue/queue.go (1 hunks)
  • mocks/queue/Queue.go (7 hunks)
  • queue/application.go (1 hunks)
  • queue/config.go (3 hunks)
  • queue/driver_async_test.go (1 hunks)
  • queue/driver_sync_test.go (1 hunks)
  • queue/worker.go (1 hunks)
  • queue/worker_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • queue/driver_sync_test.go
🧰 Additional context used
🪛 GitHub Actions: Codecov
queue/worker.go

[error] 79-79: Panic in goroutine after TestWorkerTestSuite/TestRun_FailedJob: Mock expectations not met - 2 more calls expected

🔇 Additional comments (5)
queue/worker_test.go (1)

71-76: Add test cases for edge cases and error scenarios.

The test suite should cover additional scenarios:

  • Worker shutdown with pending jobs
  • Multiple concurrent jobs
  • Invalid job signatures
  • Database errors when saving failed jobs
✅ Verification successful

The review comment is accurate - critical test scenarios are missing.

The current test suite covers basic worker operation and successful failed job handling, but lacks coverage for important edge cases revealed in the worker implementation:

  • Concurrent job handling (worker has a concurrent field)
  • Graceful shutdown with pending jobs (using isShutdown and wg)
  • Invalid job signatures (no validation tests)
  • Database errors during failed job storage (only happy path tested)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check test coverage for worker.go
go test -coverprofile=coverage.out ./queue
go tool cover -func=coverage.out | grep worker.go

Length of output: 282


Script:

#!/bin/bash
# Get the complete test file content
echo "=== Worker Test File ==="
cat queue/worker_test.go

# Find and show worker implementation
echo -e "\n=== Worker Implementation ==="
ast-grep --pattern 'type Worker struct {
  $$$
}'

# Search for similar test patterns
echo -e "\n=== Similar Test Patterns ==="
rg -A 5 "func.*Test.*Worker" 

Length of output: 3945

queue/worker.go (1)

77-83: ⚠️ Potential issue

Fix potential deadlock in failed job processing.

The current implementation might miss failed jobs during shutdown. The failed job processor should be part of the wait group and properly handle shutdown.

+	r.wg.Add(1)
 	go func() {
+		defer r.wg.Done()
 		for job := range r.failedJobChan {
-			if err = r.config.FailedJobsQuery().Create(&job); err != nil {
+			if createErr := r.config.FailedJobsQuery().Create(&job); createErr != nil {
-				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
+				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(createErr))
 			}
 		}
 	}()

Likely invalid or redundant comment.

🧰 Tools
🪛 GitHub Actions: Codecov

[error] 79-79: Panic in goroutine after TestWorkerTestSuite/TestRun_FailedJob: Mock expectations not met - 2 more calls expected

queue/config.go (1)

6-8: LGTM! Clean import organization.

The imports are well-organized and properly aliased.

mocks/queue/Queue.go (1)

71-99: LGTM! Proper error handling in GetJob mock.

The mock implementation correctly handles both return types and includes proper panic behavior for unspecified return values.

queue/driver_async_test.go (1)

141-175: Add test assertions for job execution order in chains.

The chain test should verify that jobs execute in the correct order.

Add timestamp tracking to verify execution order:

type TestChainAsyncJob struct {
+	executedAt time.Time
}

func (receiver *TestChainAsyncJob) Handle(args ...any) error {
+	receiver.executedAt = time.Now()
	testChainAsyncJob++
	return nil
}

type TestAsyncJob struct {
+	executedAt time.Time
}

func (receiver *TestAsyncJob) Handle(args ...any) error {
+	receiver.executedAt = time.Now()
	testAsyncJob++
	return nil
}

func (s *DriverAsyncTestSuite) TestChainAsyncQueue() {
	// ... existing test code ...
	
	time.Sleep(3 * time.Second)
	s.Equal(1, testChainAsyncJob, "First job in chain should execute once")
	s.Equal(1, testAsyncJob, "Second job in chain should execute once")
+	// Verify execution order
+	s.True(chainJob.executedAt.Before(asyncJob.executedAt), "Chain job should execute before async job")
}

queue/worker_test.go Outdated Show resolved Hide resolved
queue/worker.go Show resolved Hide resolved
queue/worker.go Outdated Show resolved Hide resolved
Copy link
Contributor

@hwbrzzl hwbrzzl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, CI failed.

queue/driver_sync_test.go Outdated Show resolved Hide resolved
queue/worker.go Outdated Show resolved Hide resolved
queue/worker_test.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
queue/worker.go (2)

57-61: 🛠️ Refactor suggestion

Add error logging for failed job Pop operations.

Currently, Pop errors are silently ignored. Add logging to help with debugging queue issues.

 				job, args, err := driver.Pop(r.queue)
 				if err != nil {
+					LogFacade.Error(errors.Wrap(err, "failed to pop job from queue"))
 					time.Sleep(1 * time.Second)
 					continue
 				}

90-94: ⚠️ Potential issue

Improve shutdown sequence to prevent job loss.

The current shutdown sequence might lead to job loss if there are pending failed jobs. Consider implementing a graceful shutdown:

 func (r *Worker) Shutdown() error {
+	// Signal workers to stop processing new jobs
 	r.isShutdown.Store(true)
+
+	// Wait for all workers to finish
 	r.wg.Wait()
+
+	// Ensure all failed jobs are processed
+	remaining := len(r.failedJobChan)
+	if remaining > 0 {
+		LogFacade.Warning("Waiting for %d failed jobs to be processed", remaining)
+		
+		// Add timeout to prevent infinite wait
+		timeout := time.After(30 * time.Second)
+		for {
+			select {
+			case <-timeout:
+				LogFacade.Error("Shutdown timed out with %d unprocessed failed jobs", len(r.failedJobChan))
+				break
+			default:
+				if len(r.failedJobChan) == 0 {
+					break
+				}
+				time.Sleep(100 * time.Millisecond)
+			}
+		}
+	}
+
 	close(r.failedJobChan)
 	return nil
 }
🧹 Nitpick comments (3)
queue/worker.go (3)

16-23: Consider reordering struct fields for better readability.

Consider grouping related fields together and ordering them by importance:

 type Worker struct {
-	concurrent    int
-	config        queue.Config
-	connection    string
-	failedJobChan chan FailedJob
-	isShutdown    atomic.Bool
-	job           queue.JobRepository
-	queue         string
-	wg            sync.WaitGroup
+	// Core configuration
+	config     queue.Config
+	job        queue.JobRepository
+	connection string
+	queue      string
+
+	// Runtime settings
+	concurrent int
+	
+	// Synchronization
+	isShutdown    atomic.Bool
+	wg            sync.WaitGroup
+	failedJobChan chan FailedJob
 }

77-85: Improve failed job processing robustness.

The failed job processing goroutine should be more resilient:

  1. Consider adding batch processing for better performance
  2. Add retry mechanism for failed database operations
  3. Consider adding metrics for monitoring
 	go func() {
 		defer r.wg.Done()
+		const batchSize = 100
+		batch := make([]FailedJob, 0, batchSize)
+		ticker := time.NewTicker(5 * time.Second)
+		defer ticker.Stop()
+
 		for job := range r.failedJobChan {
-			if err = r.config.FailedJobsQuery().Create(&job); err != nil {
-				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
+			batch = append(batch, job)
+			
+			if len(batch) >= batchSize || ticker.C <- struct{}{} {
+				if err := r.processBatch(batch); err != nil {
+					LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
+				}
+				batch = batch[:0]
 			}
 		}
+		// Process remaining jobs
+		if len(batch) > 0 {
+			if err := r.processBatch(batch); err != nil {
+				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
+			}
+		}
 	}()

37-88: Consider implementing health checks and metrics.

To improve operational visibility and monitoring capabilities, consider:

  1. Add health check endpoint to report worker status
  2. Implement metrics collection for:
    • Job processing rates
    • Error rates
    • Queue depths
    • Processing latencies
  3. Add structured logging with correlation IDs

This will help with:

  • Monitoring system health
  • Debugging issues
  • Capacity planning
  • Performance optimization
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 28a9e1d and 9c3155b.

📒 Files selected for processing (3)
  • errors/list.go (1 hunks)
  • queue/driver_sync_test.go (1 hunks)
  • queue/worker.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • errors/list.go
  • queue/driver_sync_test.go
🔇 Additional comments (1)
queue/worker.go (1)

26-35: Constructor implementation looks good!

The constructor properly initializes all fields and the channel buffer size appropriately matches the number of concurrent workers.

@devhaozi
Copy link
Member Author

devhaozi commented Jan 11, 2025

FYI, CI failed.

I doesn't know why?

image

If run this test case alone, it can be successful.

image

image

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
queue/worker.go (1)

57-61: 🛠️ Refactor suggestion

Add error logging for job Pop failures.

Currently, Pop errors are silently ignored. Add logging to help with debugging queue issues.

 				job, args, err := driver.Pop(r.queue)
 				if err != nil {
+					LogFacade.Error(errors.Wrap(err, "failed to pop job from queue"))
 					time.Sleep(1 * time.Second)
 					continue
 				}
🧹 Nitpick comments (2)
queue/worker_test.go (1)

29-33: Consider making mock expectations more maintainable.

The hardcoded expectation counts (Times(3), Times(2), Twice()) make the test brittle and harder to maintain. Consider:

  1. Using AnyTimes() if the exact count isn't critical for the test
  2. Extracting configuration values to constants for better maintainability
+const (
+    defaultQueueName = "async"
+    appName = "goravel"
+    defaultConnectionQueue = "default"
+    asyncDriver = "async"
+    queueSize = 10
+)

 func (s *WorkerTestSuite) SetupTest() {
     s.mockConfig = mocksconfig.NewConfig(s.T())
-    s.mockConfig.EXPECT().GetString("queue.default").Return("async").Times(3)
-    s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Times(2)
-    s.mockConfig.EXPECT().GetString("queue.connections.async.queue", "default").Return("default").Times(2)
-    s.mockConfig.EXPECT().GetString("queue.connections.async.driver").Return("async").Twice()
-    s.mockConfig.EXPECT().GetInt("queue.connections.async.size", 100).Return(10).Twice()
+    s.mockConfig.EXPECT().GetString("queue.default").Return(defaultQueueName).AnyTimes()
+    s.mockConfig.EXPECT().GetString("app.name").Return(appName).AnyTimes()
+    s.mockConfig.EXPECT().GetString("queue.connections.async.queue", "default").Return(defaultConnectionQueue).AnyTimes()
+    s.mockConfig.EXPECT().GetString("queue.connections.async.driver").Return(asyncDriver).AnyTimes()
+    s.mockConfig.EXPECT().GetInt("queue.connections.async.size", 100).Return(queueSize).AnyTimes()
queue/worker.go (1)

15-23: Add documentation for the Worker struct and its fields.

The Worker struct would benefit from documentation explaining its purpose and the role of each field. This helps with maintainability and makes it easier for other developers to understand the code.

Add documentation like this:

+// Worker handles the processing of queue jobs with support for concurrent processing
+// and graceful shutdown.
 type Worker struct {
+	// concurrent is the number of worker goroutines to spawn
 	concurrent    int
+	// config provides queue-specific configuration
 	config        queue.Config
+	// connection specifies the queue connection to use
 	connection    string
+	// failedJobChan buffers failed jobs for persistence
 	failedJobChan chan FailedJob
+	// isShutdown signals worker goroutines to stop processing
 	isShutdown    atomic.Bool
+	// job handles the execution of queue jobs
 	job           queue.JobRepository
+	// queue specifies which queue to process
 	queue         string
+	// wg tracks active worker goroutines for graceful shutdown
 	wg            sync.WaitGroup
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9c3155b and e019c6d.

📒 Files selected for processing (3)
  • queue/driver_sync_test.go (1 hunks)
  • queue/worker.go (1 hunks)
  • queue/worker_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • queue/driver_sync_test.go
🔇 Additional comments (6)
queue/worker_test.go (2)

48-51: Replace sleep-based synchronization with proper synchronization primitives.

Using time.Sleep for test synchronization is unreliable and can lead to flaky tests. Consider using channels or sync.WaitGroup for deterministic synchronization.


86-98: LGTM! Clean mock implementation.

The MockFailedJob implementation is clean, focused, and properly implements the required interface.

queue/worker.go (4)

26-35: LGTM! Well-structured constructor with appropriate initialization.

The constructor properly initializes all fields and uses an appropriate buffer size for the failed jobs channel matching the number of concurrent workers.


77-85: ⚠️ Potential issue

Fix potential data race with shared error variable.

The err variable from line 40 is shared between goroutines, which could lead to race conditions.

 	r.wg.Add(1)
 	go func() {
 		defer r.wg.Done()
 		for job := range r.failedJobChan {
-			if err = r.config.FailedJobsQuery().Create(&job); err != nil {
+			if createErr := r.config.FailedJobsQuery().Create(&job); createErr != nil {
-				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
+				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(createErr))
 			}
 		}
 	}()

Likely invalid or redundant comment.


87-89: ⚠️ Potential issue

Remove blocking Wait call from Run method.

The wg.Wait() in Run blocks indefinitely, preventing proper shutdown handling. This should be moved to the Shutdown method.

-	r.wg.Wait()
-
 	return nil

Likely invalid or redundant comment.


92-95: ⚠️ Potential issue

Implement proper shutdown sequence to prevent job loss.

The current shutdown sequence may lead to job loss and panic:

  1. Channel is closed before workers finish
  2. No waiting for goroutines to complete
  3. No draining of pending failed jobs
 func (r *Worker) Shutdown() error {
 	r.isShutdown.Store(true)
+	
+	// Wait for all worker goroutines to finish
+	r.wg.Wait()
+	
+	// Ensure all failed jobs are processed before closing the channel
+	if len(r.failedJobChan) > 0 {
+		LogFacade.Warning("waiting for %d failed jobs to be processed", len(r.failedJobChan))
+		// Optional: Add timeout here to prevent indefinite blocking
+		for len(r.failedJobChan) > 0 {
+			time.Sleep(100 * time.Millisecond)
+		}
+	}
+	
 	close(r.failedJobChan)
 	return nil
 }

Likely invalid or redundant comment.

Comment on lines +60 to +67
mockOrm := mocksorm.NewOrm(s.T())
mockQuery := mocksorm.NewQuery(s.T())
mockOrm.EXPECT().Connection("database").Return(mockOrm)
mockOrm.EXPECT().Query().Return(mockQuery)
mockQuery.EXPECT().Table("failed_jobs").Return(mockQuery)
mockQuery.EXPECT().Create(mock.Anything).Return(nil)
OrmFacade = mockOrm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid global variable mutation in tests.

Using and mutating the global OrmFacade variable makes tests harder to maintain and can lead to test pollution. Consider:

  1. Injecting the ORM dependency through the constructor
  2. Using a test-specific facade setup method

Also, consider moving the mock ORM setup to SetupTest to keep the test focused on its specific behavior.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (5)
queue/driver_machinery.go (2)

88-91: Remove unnecessary nil check for server

The server returned by m.server(queue) is always initialized and should not be nil. The check if server == nil is unnecessary and can be removed to simplify the code.

Apply this diff to remove the unnecessary nil check:

 func (m *Machinery) Run(jobs []queue.Job, queue string, concurrent int) error {
     server := m.server(queue)
-    if server == nil {
-        return nil
-    }

     jobTasks, err := jobs2Tasks(jobs)
     if err != nil {
         return err
     }

125-131: Avoid setting global logging variables to prevent side effects

Setting global logging variables like log.DEBUG, log.INFO, etc., can lead to unintended side effects in concurrent environments or when multiple instances are created. This practice can affect other parts of the application that use the machinery package.

Consider using a custom logger that implements the machinery.LoggerInterface and passing it to the machinery.Server instance. This approach encapsulates the logging configurations within the instance and prevents global state modification.

Example modification:

 func (m *Machinery) server(queue string) *machinery.Server {
     // ... existing code ...

     // Initialize custom logger
-    log.DEBUG = NewDebug(debug, m.log)
-    log.INFO = NewInfo(debug, m.log)
-    log.WARNING = NewWarning(debug, m.log)
-    log.ERROR = NewError(debug, m.log)
-    log.FATAL = NewFatal(debug, m.log)
+    customLogger := NewCustomLogger(debug, m.log)

-    return machinery.NewServer(cnf, broker, backend, lock)
+    server := machinery.NewServer(cnf, broker, backend, lock)
+    server.SetLogger(customLogger)
+    return server
 }

You would then implement NewCustomLogger to create a logger that adheres to machinery.LoggerInterface.

queue/worker.go (3)

16-23: Consider maintaining consistent field ordering.

The wg field should be grouped with other fields alphabetically for better maintainability.

 type Worker struct {
 	concurrent    int
 	config        queue.Config
 	connection    string
 	failedJobChan chan FailedJob
 	isShutdown    atomic.Bool
 	job           queue.JobRepository
 	queue         string
-	wg            sync.WaitGroup
+	wg            sync.WaitGroup
 }

26-35: Consider grouping related parameters.

The parameter ordering could be improved by grouping related parameters together (config & job, connection & queue, concurrent).

-func NewWorker(config queue.Config, concurrent int, connection string, queue string, job queue.JobRepository) *Worker {
+func NewWorker(config queue.Config, job queue.JobRepository, connection string, queue string, concurrent int) *Worker {
 	return &Worker{
 		concurrent:    concurrent,
 		config:        config,
 		connection:    connection,
 		job:           job,
 		queue:         queue,
 		failedJobChan: make(chan FailedJob, concurrent),
 	}
 }

48-51: Add version tracking to TODO comment.

The TODO comment about removing Machinery should include a link to a tracking issue.

-	// TODO: will remove in v1.17
+	// TODO(#issue): Remove Machinery support in v1.17
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e019c6d and 433de71.

📒 Files selected for processing (5)
  • contracts/queue/job.go (1 hunks)
  • queue/driver_async.go (1 hunks)
  • queue/driver_machinery.go (1 hunks)
  • queue/driver_sync.go (1 hunks)
  • queue/worker.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • queue/driver_sync.go
⏰ Context from checks skipped due to timeout of 300000ms (2)
  • GitHub Check: test / windows (1.23)
  • GitHub Check: test / windows (1.22)
🔇 Additional comments (3)
contracts/queue/job.go (1)

20-22: Verify the impact of changing Args field type from []Arg to []any

The Args field in the Jobs struct has been changed from []Arg to []any. This alteration may affect serialization, deserialization, and any code that relies on the original type. Ensure that all references to Jobs.Args are updated accordingly and that serialization mechanisms can handle the new type without issues.

queue/worker.go (2)

63-67: Add error logging for failed job retrieval.

Silent error handling in Pop operation makes debugging difficult.

 				job, args, err := driver.Pop(r.queue)
 				if err != nil {
+					LogFacade.Error(errors.Wrap(err, "failed to pop job from queue"))
 					time.Sleep(1 * time.Second)
 					continue
 				}

98-102: ⚠️ Potential issue

Improve shutdown sequence to prevent panics.

The current shutdown sequence could lead to panics if workers try to send to the closed channel. Wait for goroutines to finish before closing the channel.

 func (r *Worker) Shutdown() error {
 	r.isShutdown.Store(true)
+	r.wg.Wait()
 	close(r.failedJobChan)
 	return nil
 }

Likely invalid or redundant comment.

Comment on lines +39 to +44
for _, job := range jobs {
go func() {
time.Sleep(time.Until(job.Delay))
r.getQueue(queue) <- job
}()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix closure to correctly capture the loop variable job

In the Bulk method, the goroutine's closure captures the loop variable job by reference, which can lead to unexpected behavior where all goroutines reference the last job in the slice. To ensure each goroutine captures the correct job, pass it as a parameter to the closure.

Apply this diff to fix the issue:

 func (r *Async) Bulk(jobs []contractsqueue.Jobs, queue string) error {
     for _, job := range jobs {
-        go func() {
+        go func(job contractsqueue.Jobs) {
             time.Sleep(time.Until(job.Delay))
             r.getQueue(queue) <- job
-        }()
+        }(job)
     }

     return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _, job := range jobs {
go func() {
time.Sleep(time.Until(job.Delay))
r.getQueue(queue) <- job
}()
}
for _, job := range jobs {
go func(job contractsqueue.Jobs) {
time.Sleep(time.Until(job.Delay))
r.getQueue(queue) <- job
}(job)
}

Comment on lines +104 to +109
// runMachinery is a special case for Machinery
// TODO: will remove in v1.17
func (r *Worker) runMachinery(driver queue.Driver) error {
m := driver.(*Machinery)
return m.Run(r.job.All(), r.queue, r.concurrent)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add type assertion check and improve TODO comment.

The type assertion could panic if the driver is not *Machinery, and the TODO comment should be consistent with version tracking.

-// TODO: will remove in v1.17
+// TODO(#issue): Remove Machinery support in v1.17
 func (r *Worker) runMachinery(driver queue.Driver) error {
-	m := driver.(*Machinery)
+	m, ok := driver.(*Machinery)
+	if !ok {
+		return errors.New("invalid driver type: expected *Machinery")
+	}
 	return m.Run(r.job.All(), r.queue, r.concurrent)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// runMachinery is a special case for Machinery
// TODO: will remove in v1.17
func (r *Worker) runMachinery(driver queue.Driver) error {
m := driver.(*Machinery)
return m.Run(r.job.All(), r.queue, r.concurrent)
}
// TODO(#issue): Remove Machinery support in v1.17
func (r *Worker) runMachinery(driver queue.Driver) error {
m, ok := driver.(*Machinery)
if !ok {
return errors.New("invalid driver type: expected *Machinery")
}
return m.Run(r.job.All(), r.queue, r.concurrent)
}

Comment on lines +54 to +91
for i := 0; i < r.concurrent; i++ {
r.wg.Add(1)
go func() {
defer r.wg.Done()
for {
if r.isShutdown.Load() {
return
}

if receiver.queue == "" {
receiver.queue = server.GetConfig().DefaultQueue
}
if receiver.concurrent == 0 {
receiver.concurrent = 1
}
worker := server.NewWorker(receiver.queue, receiver.concurrent)
if err := worker.Launch(); err != nil {
return err
job, args, err := driver.Pop(r.queue)
if err != nil {
time.Sleep(1 * time.Second)
continue
}

if err = r.job.Call(job.Signature(), args); err != nil {
r.failedJobChan <- FailedJob{
UUID: uuid.New(),
Connection: r.connection,
Queue: r.queue,
Payload: args,
Exception: err.Error(),
FailedAt: carbon.DateTime{Carbon: carbon.Now()},
}
}
}
}()
}

r.wg.Add(1)
go func() {
defer r.wg.Done()
for job := range r.failedJobChan {
if err = r.config.FailedJobsQuery().Create(&job); err != nil {
LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
}
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential race conditions and improve error handling.

  1. The err variable is shared across goroutines, which could lead to race conditions
  2. The failedJobChan handling could be improved with proper error handling and graceful shutdown
 	for i := 0; i < r.concurrent; i++ {
 		r.wg.Add(1)
 		go func() {
 			defer r.wg.Done()
+			var processErr error
 			for {
 				if r.isShutdown.Load() {
 					return
 				}
 
-				job, args, err := driver.Pop(r.queue)
-				if err != nil {
+				job, args, processErr := driver.Pop(r.queue)
+				if processErr != nil {
+					LogFacade.Error(errors.Wrap(processErr, "failed to pop job from queue"))
 					time.Sleep(1 * time.Second)
 					continue
 				}
 
-				if err = r.job.Call(job.Signature(), args); err != nil {
+				if processErr = r.job.Call(job.Signature(), args); processErr != nil {
 					r.failedJobChan <- FailedJob{
 						UUID:       uuid.New(),
 						Connection: r.connection,
 						Queue:      r.queue,
 						Payload:    args,
-						Exception:  err.Error(),
+						Exception:  processErr.Error(),
 						FailedAt:   carbon.DateTime{Carbon: carbon.Now()},
 					}
 				}
 			}
 		}()
 	}
 
 	r.wg.Add(1)
 	go func() {
 		defer r.wg.Done()
 		for job := range r.failedJobChan {
-			if err = r.config.FailedJobsQuery().Create(&job); err != nil {
-				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
+			if saveErr := r.config.FailedJobsQuery().Create(&job); saveErr != nil {
+				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(saveErr))
 			}
 		}
 	}()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for i := 0; i < r.concurrent; i++ {
r.wg.Add(1)
go func() {
defer r.wg.Done()
for {
if r.isShutdown.Load() {
return
}
if receiver.queue == "" {
receiver.queue = server.GetConfig().DefaultQueue
}
if receiver.concurrent == 0 {
receiver.concurrent = 1
}
worker := server.NewWorker(receiver.queue, receiver.concurrent)
if err := worker.Launch(); err != nil {
return err
job, args, err := driver.Pop(r.queue)
if err != nil {
time.Sleep(1 * time.Second)
continue
}
if err = r.job.Call(job.Signature(), args); err != nil {
r.failedJobChan <- FailedJob{
UUID: uuid.New(),
Connection: r.connection,
Queue: r.queue,
Payload: args,
Exception: err.Error(),
FailedAt: carbon.DateTime{Carbon: carbon.Now()},
}
}
}
}()
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
for job := range r.failedJobChan {
if err = r.config.FailedJobsQuery().Create(&job); err != nil {
LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
}
}
}()
for i := 0; i < r.concurrent; i++ {
r.wg.Add(1)
go func() {
defer r.wg.Done()
var processErr error
for {
if r.isShutdown.Load() {
return
}
job, args, processErr := driver.Pop(r.queue)
if processErr != nil {
LogFacade.Error(errors.Wrap(processErr, "failed to pop job from queue"))
time.Sleep(1 * time.Second)
continue
}
if processErr = r.job.Call(job.Signature(), args); processErr != nil {
r.failedJobChan <- FailedJob{
UUID: uuid.New(),
Connection: r.connection,
Queue: r.queue,
Payload: args,
Exception: processErr.Error(),
FailedAt: carbon.DateTime{Carbon: carbon.Now()},
}
}
}
}()
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
for job := range r.failedJobChan {
if saveErr := r.config.FailedJobsQuery().Create(&job); saveErr != nil {
LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(saveErr))
}
}
}()

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
queue/driver_machinery_test.go (1)

Line range hint 118-137: Improve test reliability and maintainability.

The test cases have several areas for improvement:

  1. Using time.Sleep for synchronization is fragile and can lead to flaky tests.
  2. Worker goroutines might leak if the test fails before context cancellation.
  3. Multiple test cases share global variables for job counters.

Consider these improvements:

  1. Use channels or sync.WaitGroup for synchronization instead of time.Sleep
  2. Implement proper cleanup in teardown
  3. Reset job counters between tests
  4. Add helper functions to reduce code duplication

Example refactor for better synchronization:

 func (s *MachineryTestSuite) TestDefaultAsyncQueue_EnableDebug() {
+    done := make(chan struct{})
     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
     defer cancel()
     go func(ctx context.Context) {
         s.Nil(s.app.Worker(queue.Args{
             Queue: "debug",
         }).Run())
+        close(done)
         for range ctx.Done() {
             return
         }
     }(ctx)
-    time.Sleep(2 * time.Second)
+    select {
+    case <-done:
+    case <-time.After(5 * time.Second):
+        s.Fail("Worker setup timeout")
+    }

Also applies to: 154-171, 188-209, 225-246, 262-281, 297-330, 348-376

🧹 Nitpick comments (1)
queue/driver_machinery_test.go (1)

382-475: Enhance job implementations for better testing.

The test job implementations could be improved:

  1. Job signatures could conflict as multiple jobs use "test_async_job"
  2. Error handling in TestChainMachineryJob uses a magic boolean argument
  3. Global variables for counting make tests interdependent

Consider these improvements:

-func (receiver *TestCustomMachineryJob) Signature() string {
-    return "test_async_job"
+func (receiver *TestCustomMachineryJob) Signature() string {
+    return "test_custom_machinery_job"
 }

 type TestChainMachineryJob struct {
+    shouldError bool
 }

 func (receiver *TestChainMachineryJob) Handle(args ...any) error {
-    if len(args) > 0 && cast.ToBool(args[0]) {
+    if receiver.shouldError {
         testChainMachineryJobError++
         return errors.New("error")
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 433de71 and 8e8ab94.

📒 Files selected for processing (1)
  • queue/driver_machinery_test.go (16 hunks)
🧰 Additional context used
🪛 GitHub Actions: Codecov
queue/driver_machinery_test.go

[error] 302-302: Test failure in TestMachineryTestSuite/TestChainAsyncQueue: Expected nil, but got error 'unknown queue driver: %s'


[error] 353-353: Test failure in TestMachineryTestSuite/TestChainAsyncQueue_Error: Expected nil, but got error 'unknown queue driver: %s'


[error] 230-230: Test failure in TestMachineryTestSuite/TestCustomAsyncQueue: Expected nil, but got error 'unknown queue driver: %s'


[error] 159-159: Test failure in TestMachineryTestSuite/TestDefaultAsyncQueue_DisableDebug: Expected nil, but got error 'unknown queue driver: %s'


[error] 123-123: Test failure in TestMachineryTestSuite/TestDefaultAsyncQueue_EnableDebug: Expected nil, but got error 'unknown queue driver: %s'


[error] 193-193: Test failure in TestMachineryTestSuite/TestDelayAsyncQueue: Expected nil, but got error 'unknown queue driver: %s'


[error] 267-267: Test failure in TestMachineryTestSuite/TestErrorAsyncQueue: Expected nil, but got error 'unknown queue driver: %s'

⏰ Context from checks skipped due to timeout of 300000ms (2)
  • GitHub Check: test / windows (1.23)
  • GitHub Check: test / windows (1.22)
🔇 Additional comments (2)
queue/driver_machinery_test.go (2)

1-1: Clarify the TODO comment's intent.

The TODO comment indicates this file will be removed in v1.17, but it's unclear why. Consider adding more context about the reason for removal and any replacement functionality.

✅ Verification successful

TODO comment is valid - part of Machinery driver deprecation

The TODO comment is part of a coordinated effort to remove the Machinery queue driver and all its related components in v1.17. The codebase shows consistent TODO markers across implementation, tests, configuration, and contracts, with Custom driver (DriverCustom) being the alternative solution.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for related TODOs or deprecation notices
rg -l "TODO.*v1.17|DEPRECATED.*v1.17" 

Length of output: 226


Script:

#!/bin/bash
# Get the content of TODO and DEPRECATED comments with context
rg -A 2 "TODO.*v1.17|DEPRECATED.*v1.17"

Length of output: 1700


58-61: ⚠️ Potential issue

Fix test failures related to queue driver configuration.

All test cases are failing with "unknown queue driver" error. This suggests that either:

  1. The queue driver is not properly registered, or
  2. The mock configuration is not returning the correct driver type.

Verify that:

  • The machinery driver is registered in the queue factory
  • The mock configuration returns "machinery" instead of "redis" for the driver type

@hwbrzzl
Copy link
Contributor

hwbrzzl commented Jan 12, 2025

@devhaozi must be conflict when multiple goroutines.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
queue/driver_machinery_test.go (4)

1-1: Consider adding more context to the TODO comment.

The TODO comment indicates removal in v1.17 but doesn't explain why. Add a brief explanation of the removal reason to help future maintainers understand the deprecation plan.

-// TODO: Will be removed in v1.17
+// TODO: Will be removed in v1.17 as part of the queue system refactoring (see issue #153)

Line range hint 251-281: Enhance error test coverage.

The error test case only verifies that the job wasn't executed. Consider adding tests for:

  • Invalid job registration
  • Job timeout scenarios
  • Redis connection failures

391-394: Add argument validation and documentation.

The job handler accepts variadic arguments without validation or documentation of expected types.

+// Handle Execute the job.
+// Args:
+//   args[0]: string - test name
+//   args[1]: int - test counter
 func (receiver *TestMachineryJob) Handle(args ...any) error {
+    if len(args) < 2 {
+        return errors.New("insufficient arguments: expected test name and counter")
+    }
     testMachineryJob++
     return nil
 }

466-475: Improve error handling in chain job.

The error case in TestChainMachineryJob could be more descriptive and include validation.

 func (receiver *TestChainMachineryJob) Handle(args ...any) error {
+    if len(args) == 0 {
+        return errors.New("missing arguments")
+    }
     if len(args) > 0 && cast.ToBool(args[0]) {
         testChainMachineryJobError++
-        return errors.New("error")
+        return errors.New("chain job failed: error flag was set")
     }
     testChainMachineryJob++
     return nil
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8e8ab94 and 25b57fd.

📒 Files selected for processing (1)
  • queue/driver_machinery_test.go (11 hunks)
⏰ Context from checks skipped due to timeout of 300000ms (2)
  • GitHub Check: test / windows (1.23)
  • GitHub Check: test / windows (1.22)

Comment on lines 131 to +137
time.Sleep(2 * time.Second)
s.Nil(s.app.Job(&TestAsyncJob{}, []queue.Arg{
{Type: "string", Value: "TestDefaultAsyncQueue_EnableDebug"},
{Type: "int", Value: 1},
s.Nil(s.app.Job(&TestMachineryJob{}, []any{
"TestDefaultAsyncQueue_EnableDebug",
1,
}).OnQueue("debug").Dispatch())
time.Sleep(2 * time.Second)
s.Equal(1, testAsyncJob)
s.Equal(1, testMachineryJob)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace sleep with proper synchronization.

Using fixed sleep durations for async operations can make tests flaky. Consider using channels or sync.WaitGroup for proper synchronization.

Example implementation:

+var wg sync.WaitGroup
+wg.Add(1)
 s.Nil(s.app.Job(&TestMachineryJob{}, []any{
   "TestDefaultAsyncQueue_EnableDebug",
   1,
-}).OnQueue("debug").Dispatch())
-time.Sleep(2 * time.Second)
+}).OnQueue("debug").Dispatch())
+wg.Wait()

Then modify the job handler:

 func (receiver *TestMachineryJob) Handle(args ...any) error {
   testMachineryJob++
+  wg.Done()
   return nil
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +25 to +31
testMachineryJob = 0
testMachineryJobOfDisableDebug = 0
testDelayMachineryJob = 0
testCustomMachineryJob = 0
testErrorMachineryJob = 0
testChainMachineryJob = 0
testChainMachineryJobError = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add test cleanup for global variables.

The test uses global variables to track job execution counts. These should be reset between tests to prevent test interference.

Add a TearDownTest method to reset the counters:

+func (s *MachineryTestSuite) TearDownTest() {
+    testMachineryJob = 0
+    testMachineryJobOfDisableDebug = 0
+    testDelayMachineryJob = 0
+    testCustomMachineryJob = 0
+    testErrorMachineryJob = 0
+    testChainMachineryJob = 0
+    testChainMachineryJobError = 0
+}

Committable suggestion skipped: line range outside the PR's diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

✨ [Feature] Refactor the queue facades
2 participants