Skip to content

Commit

Permalink
chore: refactor message handling and update dependencies
Browse files Browse the repository at this point in the history
- Change `QueuedMessage` to `TaskMessage` in various function signatures
- Update dependency `github.com/golang-queue/queue` to version `v0.3.0`
- Add error handling for JSON unmarshal in `redis.go`

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
  • Loading branch information
appleboy committed Jan 20, 2025
1 parent 0fce981 commit dc443d8
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
w := redisdb.NewWorker(
redisdb.WithAddr("127.0.0.1:6379"),
redisdb.WithChannel("foobar"),
redisdb.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
redisdb.WithRunFunc(func(ctx context.Context, m queue.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/golang-queue/redisdb
go 1.22

require (
github.com/golang-queue/queue v0.2.2-0.20250119152159-18502599f7a6
github.com/golang-queue/queue v0.3.0
github.com/redis/go-redis/v9 v9.7.0
github.com/stretchr/testify v1.10.0
github.com/testcontainers/testcontainers-go v0.35.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-queue/queue v0.2.2-0.20250119152159-18502599f7a6 h1:ifL43cgrzjheIJqb6dU4poR+4U1jhTwzkXwlh29nC8w=
github.com/golang-queue/queue v0.2.2-0.20250119152159-18502599f7a6/go.mod h1:SkjMwz1TjxZOrF7kABvbar1CagcMxwRtXt5Tx00wb4g=
github.com/golang-queue/queue v0.3.0 h1:gyBLNT9EDOsChazYScp8iLiwLfG0SdnCDmNUybcHig4=
github.com/golang-queue/queue v0.3.0/go.mod h1:SkjMwz1TjxZOrF7kABvbar1CagcMxwRtXt5Tx00wb4g=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
Expand Down
6 changes: 3 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type Option func(*options)

type options struct {
runFunc func(context.Context, core.QueuedMessage) error
runFunc func(context.Context, core.TaskMessage) error
logger queue.Logger
addr string
db int
Expand Down Expand Up @@ -127,7 +127,7 @@ func WithChannel(channel string) Option {
}

// WithRunFunc setup the run func of queue
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option {
return func(w *options) {
w.runFunc = fn
}
Expand All @@ -153,7 +153,7 @@ func newOptions(opts ...Option) options {
// default channel size in go-redis package
channelSize: 100,
logger: queue.NewLogger(),
runFunc: func(context.Context, core.QueuedMessage) error {
runFunc: func(context.Context, core.TaskMessage) error {
return nil
},
}
Expand Down
11 changes: 7 additions & 4 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewWorker(opts ...Option) *Worker {
}

// Run to execute new task
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
func (w *Worker) Run(ctx context.Context, task core.TaskMessage) error {
return w.opts.runFunc(ctx, task)
}

Expand All @@ -136,7 +136,7 @@ func (w *Worker) Shutdown() error {
}

// Queue send notification to queue
func (w *Worker) Queue(job core.QueuedMessage) error {
func (w *Worker) Queue(job core.TaskMessage) error {
if atomic.LoadInt32(&w.stopFlag) == 1 {
return queue.ErrQueueShutdown
}
Expand All @@ -153,7 +153,7 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
}

// Request a new task
func (w *Worker) Request() (core.QueuedMessage, error) {
func (w *Worker) Request() (core.TaskMessage, error) {
clock := 0
loop:
for {
Expand All @@ -163,7 +163,10 @@ loop:
return nil, queue.ErrQueueHasBeenClosed
}
var data job.Message
_ = json.Unmarshal([]byte(task.Payload), &data)
err := json.Unmarshal([]byte(task.Payload), &data)
if err != nil {
return nil, err
}
return &data, nil
case <-time.After(1 * time.Second):
if clock == 5 {
Expand Down
14 changes: 7 additions & 7 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestCustomFuncAndWait(t *testing.T) {
w := NewWorker(
WithAddr(endpoint),
WithChannel("test3"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
time.Sleep(500 * time.Millisecond)
return nil
}),
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestRedisCluster(t *testing.T) {
WithAddr(strings.Join(hosts, ",")),
WithChannel("testCluster"),
WithCluster(),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
time.Sleep(500 * time.Millisecond)
return nil
}),
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestRedisSentinel(t *testing.T) {
WithMasterName("mymaster"),
WithChannel("testSentinel"),
WithSentinel(),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
time.Sleep(500 * time.Millisecond)
return nil
}),
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestJobReachTimeout(t *testing.T) {
w := NewWorker(
WithAddr(endpoint),
WithChannel("timeout"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
WithAddr(endpoint),
WithChannel("cancel"),
WithLogger(queue.NewLogger()),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestGoroutineLeak(t *testing.T) {
WithAddr(endpoint),
WithChannel("GoroutineLeak"),
WithLogger(queue.NewEmptyLogger()),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestGoroutinePanic(t *testing.T) {
w := NewWorker(
WithAddr(endpoint),
WithChannel("GoroutinePanic"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
panic("missing something")
}),
)
Expand Down

0 comments on commit dc443d8

Please sign in to comment.