Skip to content

Commit

Permalink
change ack fn to done fn
Browse files Browse the repository at this point in the history
(now we allow to nack the task)
  • Loading branch information
muhsinshodiq committed Jul 22, 2021
1 parent 95c1b1f commit edf7a8c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ This library also handling auto reconnect for connection and channel (wrap insid

### Installation

go get github.com/sepulsa/rabbitmq-vn-delay@v1.2.0
go get github.com/sepulsa/rabbitmq-vn-delay@v1.3.0

## Example

Expand Down
14 changes: 9 additions & 5 deletions rabbitmq-vn-delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ const (
var lock = &sync.Mutex{}
var delayLock = &sync.Mutex{}

type AckFn func() error
type HandlerFn func(data string, ack AckFn)
type DoneFn func(ack bool) error
type HandlerFn func(data string, done DoneFn)

type queueSubscription struct {
queueName string
Expand Down Expand Up @@ -258,12 +258,16 @@ func (r *RabbitMQVNDelay) subscribe(index int) {
// message := string(data.Body)
// log.Printf("%s [%s] process message: %s\n", logTag, queueName, message)

ackFn := func() error {
doneFn := func(ack bool) error {
// log.Printf("%s [%s] ack message: %s\n", logTag, queueName, message)
return data.Ack(false)
if ack {
return data.Ack(false)
} else {
return data.Nack(false, true)
}
}

r.queueSubs[index].handler(string(data.Body), ackFn)
r.queueSubs[index].handler(string(data.Body), doneFn)
}
}
}()
Expand Down
12 changes: 6 additions & 6 deletions rabbitmq-vn-delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

const (
queueName = "vn.unit-test"
acceptableDelay int64 = 50
acceptableDelay int64 = 150
)

var rabbitMQ *vndelay.RabbitMQVNDelay
Expand All @@ -34,7 +34,7 @@ func TestMain(m *testing.M) {
func TestPublish(t *testing.T) {
var waitgroup sync.WaitGroup

handler := func(data string, ack vndelay.AckFn) {
handler := func(data string, ack vndelay.DoneFn) {
receivedTime := time.Now().Unix()
messageInNumber, err := strconv.ParseInt(data, 10, 64)
if err != nil {
Expand All @@ -47,7 +47,7 @@ func TestPublish(t *testing.T) {
t.Error("Message received not on proper delay. Expect at: ", messageInNumber, " but received at: ", receivedTime)
}

ack()
ack(true)
waitgroup.Done()
}

Expand Down Expand Up @@ -90,7 +90,7 @@ func TestSubscribe(t *testing.T) {
log.Println("Start publish queue")

for i := 1; i <= count; i++ {
data := fmt.Sprintf("iseng %02d", i)
data := fmt.Sprintf("%02d", i)

log.Println("publish data:", data)
err := rabbitMQ.Publish(queueName, data)
Expand All @@ -103,10 +103,10 @@ func TestSubscribe(t *testing.T) {
}
}()

handler := func(data string, ack vndelay.AckFn) {
handler := func(data string, done vndelay.DoneFn) {
log.Printf("execute task with data %s\n", data)
time.Sleep(time.Millisecond * 100)
ack()
done(true)
waitgroup.Done()
}

Expand Down

0 comments on commit edf7a8c

Please sign in to comment.