-
-
Notifications
You must be signed in to change notification settings - Fork 736
Getting Started
In this tutorial, we are going to write two programs, client
and workers
.
-
client.go
will create and schedule tasks to be processed asynchronously by the background workers. -
workers.go
will start multiple concurrent workers which process the tasks created by the client.
This guide assumes that you are running a Redis server at localhost:6379
.
Before we start, make sure you have Redis installed and running.
Let's start by creating our two main files.
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
And install asynq
package.
go get -u github.com/hibiken/asynq
Before we start writing code, let's review a few core types that we'll use in both of our programs.
Asynq uses Redis as a message broker.
Both client.go
and workers.go
need to connect to Redis to write and read from it.
We are going to use RedisClientOpt
to specify how to connect to the local Redis instance.
var redis = &asynq.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}
In asynq
, a unit of work is encapsulated in a type called Task
.
Which has two fields: Type
and Payload
.
// Task represents a task to be performed.
type Task struct {
// Type indicates the type of a task to be performed.
Type string
// Payload holds data needed to perform the task.
Payload Payload
}
Type
is a simple string value that indicates the type of the given task.
Payload
holds data needed for task execution, and you can think of it as map[string]interface{}
. One important thing to note is that the payload values have to be serializable.
Now that we've taken a look at the core types, let's start writing our programs.
In client.go
, we are going to create a few tasks and enqueue them using asynq.Client
.
To create a task, use NewTask
function and pass type and payload for the task.
asynq.Client
supports three methods for scheduling tasks: Enqueue
, EnqueueIn
, and EnqueueAt
.
Use client.Enqueue
to enqueue tasks to be processed immediately.
Use client.EnqueueIn
or client.EnqueueAt
to schedule tasks to be processed in the future.
// client.go
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
client := asynq.NewClient(r)
// Create a task with typename and payload.
t1 := asynq.NewTask("email:welcome", map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask("email:reminder", map[string]interface{}{"user_id": 42})
// Process the task immediately.
err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
// Process the task 24 hours later.
err = client.EnqueueIn(24*time.Hour, t2)
if err != nil {
log.Fatal(err)
}
}
That's all we need for the client program :)
In workers.go
, we'll create a asynq.Background
instance to start the workers.
NewBackground
function takes RedisConnOpt
and Config
.
Config
is used to configure how the background task processing should behave.
You can take a look at the documentation on Config
to see all the available config options.
To keep it simple, we are only going to specify the concurrency in this example.
// workers.go
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
bg := asynq.NewBackground(r, &asynq.Config{
Concurrency: 10,
})
bg.Run(handler)
}
The argument to (*asynq.Background).Run
is an interface asynq.Handler
which has one method ProcessTask
.
// ProcessTask should return nil if the task was processed successfully.
//
// If ProcessTask return a non-nil error or panics, the task will be retried again later.
type Handler interface {
ProcessTask(context.Context, *Task) error
}
The simplest way to implement a handler is to define a function with the same signature and use asynq.HandlerFunc
adapter type when passing it to Run
.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type {
case "email:welcome":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
case "email:reminder":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
default:
return fmt.Errorf("unexpected task type: %s", t.Type)
}
return nil
}
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
bg := asynq.NewBackground(r, &asynq.Config{
Concurrency: 10,
})
// Use asynq.HandlerFunc adapter for a handler function
bg.Run(asynq.HandlerFunc(handler))
}
We could keep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
To refactor our code, let's use ServeMux
to create our handler.
Just like the ServeMux
from "net/http"
package, you register a handler by calling Handle
or HandleFunc
. ServeMux
satisfies the Handler
interface, so that you can pass it to (*Background).Run
.
// workers.go
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
bg := asynq.NewBackground(r, &asynq.Config{
Concurrency: 10,
})
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
bg.Run(mux)
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
return nil
}
Now that we've extracted functions to handle each task types, the code looks a bit more organized.
However, the code is a bit too implicit, we have these string values for task types and payload keys that should be encapsulated in a cohesive package. Let's refactor our code by writing a package that encapsulates task creations and handling. We'll simply create a package called tasks
.
mkdir tasks && touch tasks/tasks.go
package tasks
import (
"fmt"
"github.com/hibiken/asynq"
)
// A list of task types.
const (
WelcomeEmail = "email:welcome"
ReminderEmail = "email:reminder"
)
func NewWelcomeEmailTask(id int) *asynq.Task {
payload := map[string]interface{"user_id": id}
return asynq.NewTask(WelcomeEmail, payload)
}
func NewReminderEmailTask(id int) *asynq.Task {
payload := map[string]interface{"user_id": id}
return asynq.NewTask(ReminderEmail, payload)
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
return nil
}
And now we can import this package in both client.go
and workers.go
.
// client.go
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
client := asynq.NewClient(r)
t1 := tasks.NewWelcomeEmailTask(42)
t2 := tasks.NewReminderEmailTask(42)
// Process the task immediately.
err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
// Process the task 24 hours later.
err = client.EnqueueIn(24*time.Hour, t2)
if err != nil {
log.Fatal(err)
}
}
// workers.go
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
bg := asynq.NewBackground(r, &asynq.Config{
Concurrency: 10,
})
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.WelcomeEmail, tasks.HandleWelcomeEmailTask)
mux.HandleFunc(tasks.ReminderEmail, tasks.HandleReminderEmailTask)
bg.Run(mux)
}
And now the code looks much nicer!
Now that we have both client
and workers
, we can run both programs.
Let's run the client
program to create and schedule tasks.
go run client/client.go
This will create two tasks: One that should be processed immediately and another to be processed 24 hours later.
Let's use asynqmon
tool to inspect the tasks.
asynqmon stats
You should be able to see that there's one task in Enqueued state and another in Scheduled state.
Note: To learn more about the meaning of each state, check out Life of a Task.
Let's run asynqmon
with the watch
command so that we can continuously run the command to see the changes.
watch -n 3 asynqmon stats # Runs `asynqmon stats` every 3 seconds
And finally, let's start the workers
program to process tasks.
go run workers/workers.go
Note: This will not exit until you send a signal to terminate the program. See Signal Wiki page for best practice on how to safely terminate background workers.
You should be able to see some text printed in your terminal indicating that the task was processed successfully.
You can run the client
again to see how workers pick up the tasks and process them.
It's not uncommon that a task doesn't get processed successfully at the first attempt. By default, a failed task will be retried with exponential backoff up to 25 times. Let's update our handler to return an error to simulate an unsuccessful case.
// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Attempting to Send Welcome Email to User %d...\n", id)
return fmt.Errorf("could not send email to the user") // <-- Return error
}
Let's restart our workers program and enqueue a task.
go run workers/workers.go
go run client/client.go
If you are running asynqmon stats
, you should be able to see that there's a task in the Retry state.
To inspect which tasks are in retry state, you can run
asynqmon ls retry
This will list all the task that will be retried in the future. The output includes ETA of the task's next execution.
Once a task exhausts its retry count, the task will transition to the Dead state and won't be retried (You can still manually enqueue dead tasks by running asynqmon enq
command).
Let's fix our handler before we wrap up this tutorial.
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
Now that we fixed the handler, task will be processed successfully at the next attempt!
This was a whirlwind tour of asynq
basics. To learn more about all of its features such as priority queues and custom retry, see our Wiki page.
Thanks for reading!