Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublishAsync Retry Mechanism Not Working as Expected #1678

Closed
somratdutta opened this issue Jul 18, 2024 · 5 comments
Closed

PublishAsync Retry Mechanism Not Working as Expected #1678

somratdutta opened this issue Jul 18, 2024 · 5 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@somratdutta
Copy link
Contributor

somratdutta commented Jul 18, 2024

Observed behavior

js.PublishAsync("test", []byte("hi"), nats.RetryWait(5*time.Second), nats.RetryAttempts(10))

It gives up at first attempt itself after waiting for 5 second.

Expected behavior

The method should retry publishing the message 10 times, with a 5-second wait between attempts, giving up after 50 seconds if there is an error on each attempt.

Server and client version

  • NATS Server Version: 2.10.3
  • NATS Client Version: 1.35.0

Host environment

  • Operating System: Mac OS

Steps to reproduce

  1. Start a NATS server cluster in Jetstream mode with trace & debug enabled.
  2. Use the following sample Go code to publish a message to a subject that does not exist, with retry options set.
package main

import (
    "github.com/nats-io/nats.go"
    "log"
    "time"
)


func main
()
 {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatalf("Error connecting to NATS: %v", err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        log.Fatalf("Error getting JetStream context: %v", err)
    }

    subject := "test"
    message := []byte("hi")
    retryWait := 5 * time.Second
    retryAttempts := 10

    log.Printf("Async Publishing to subject %s with %d retry attempts and %v retry wait time", subject, retryAttempts, retryWait)

    // PublishAsync with retry options
    ack, err := js.PublishAsync(subject, message,
        nats.RetryWait(retryWait),
        nats.RetryAttempts(retryAttempts),
    )
    if err != nil {
        log.Fatalf("Async Publish error: %v", err)
    }

    // Simply wait for a long duration to ensure processing
    time.Sleep(time.Duration(retryAttempts)*retryWait + 10*time.Second)
    log.Println("Wait completed")

    if ack.Err() != nil {
        log.Printf("PublishAsync final error: %v", ack.Err())
    } else {
        log.Println("PublishAsync acknowledged successfully")
}

Console Output:

2024/07/18 18:40:33 Async Publishing to subject test with 10 retry attempts and 5s retry wait time
2024/07/18 18:41:33 Wait completed
2024/07/18 18:41:33 PublishAsync final error: 0x14000220000

Process finished with the exit code 0

Server Logs:

.....
[23368] 2024/07/18 18:40:33.048660 [TRC] 127.0.0.1:6223 - rid:21 - ->> [RS+ $G _INBOX.aFMeI1.*]
[23368] 2024/07/18 18:40:33.049439 [TRC] 127.0.0.1:64606 - cid:26 - <<- [PUB test _INBOX.aFMeI1.fNc9Yt 2]
[23368] 2024/07/18 18:40:33.049579 [TRC] 127.0.0.1:64606 - cid:26 - <<- MSG_PAYLOAD: ["hi"]
.....

Server logs clearly show that there were no retry attempts.

@somratdutta somratdutta added the defect Suspected defect such as a bug or regression label Jul 18, 2024
@somratdutta
Copy link
Contributor Author

Here's my server log
nats-server-1.log

@piotrpio
Copy link
Collaborator

piotrpio commented Aug 2, 2024

Hey @somratdutta, thanks for creating the issue and sorry it took so long. Indeed, the old JetStream API does not support, the new one (in jetstream package) does. I'll add handling the timeout in old API as well.

In the meantime, I suggest you try the new API - it's meant to make message consumption easier and more straightforward, the rest of the API works similarly to the old one. Here's your example using the new API:

func main() {
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatalf("Error connecting to NATS: %v", err)
	}
	defer nc.Close()

	js, err := jetstream.New(nc)
	if err != nil {
		log.Fatalf("Error getting JetStream context: %v", err)
	}

	subject := "test"
	message := []byte("hi")
	retryWait := 5 * time.Second
	retryAttempts := 10

	log.Printf("Async Publishing to subject %s with %d retry attempts and %v retry wait time", subject, retryAttempts, retryWait)

	// PublishAsync with retry options
	ack, err := js.PublishAsync(subject, message,
		jetstream.WithRetryWait(retryWait),
		jetstream.WithRetryAttempts(retryAttempts),
	)
	if err != nil {
		log.Fatalf("Async Publish error: %v", err)
	}

	select {
	case <-ack.Ok():
		log.Println("PublishAsync acknowledged successfully")
	case err := <-ack.Err():
		log.Printf("PublishAsync final error: %s", err)
	}
}

Also, one thing that I noticed in regards to your example:

    if ack.Err() != nil {
        log.Printf("PublishAsync final error: %v", ack.Err())
    } else {
        log.Println("PublishAsync acknowledged successfully")
    }

ack.Err() returns a channel, not an error, so what you want is something like this:

	select {
	case <-ack.Ok():
		log.Println("PublishAsync acknowledged successfully")
	case err := <-ack.Err():
		log.Printf("PublishAsync final error: %s", err)
	}

@pranavmehta94
Copy link
Contributor

pranavmehta94 commented Aug 9, 2024

@piotrpio Can you please review the PR which is opened to add retries in publish async for old jetstream API?

@piotrpio
Copy link
Collaborator

piotrpio commented Aug 9, 2024

@pranavmehta94 thanks, I reviewed the PR and have a few comments :)

@somratdutta
Copy link
Contributor Author

closing this issue as it was solved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

3 participants