Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Idempotent Producer #1152

Merged
merged 3 commits into from
Oct 3, 2018
Merged

Conversation

mimaison
Copy link
Contributor

This PR adds support for the Idempotent Producer.

If the Idempotent configuration is enabled, upon startup the producer will retrieve a ProducerID and add it as well as sequence numbers to all produce requests.

This is still not complete and has not been properly tested yet. Quick tests showed that when retrying with idempotency, messages that have already been written to disk by brokers are correctly marked as duplicates.

This should be enough to start a discussion. Have a look and see if it's something you'd like merged. If so, I'll complete testing.

@mimaison
Copy link
Contributor Author

mimaison commented Sep 3, 2018

@eapache Is this a feature you would be interested in? If I'm not mistaken, this would be the first 3rd party client to support it.

It still needs more work and I plan to complete it this week. So I'd like to know if for any reasons it's not something you'd want

@buyology
Copy link

buyology commented Sep 4, 2018

@mimaison — great to see someone take this on! 🎉

I have done some work on this myself (and the txn producer (which I more or less got up and running)) — and I was blocked by #1070. Curious to hear your thoughts about that issue.

@birdayz
Copy link
Contributor

birdayz commented Sep 4, 2018

i've also just started working on this, i was not aware that you already got the TX producer running.
i wonder if we can some up with a plan to get the EOS producer and consumer in, with everything that's required. i'm interested in helping.
@buyology mind to share your tx producer? i could't find a branch for this in your repo, just some other stuff like the Coordinator request refactor :)

@mimaison
Copy link
Contributor Author

mimaison commented Sep 4, 2018

@buyology We've not solved that issue yet.

Our plan was to try requeuing failed PartitionSets directly into the output channel of BrokerProducer. Then assuming MaxOpenRequests is 1 (like when Idempotent was first released in Java in 0.11), the producer should recover. We've not tested that yet though.

Did you try something similar or some other methods?

@edoardocomar
Copy link
Member

@buyology we've reimplemented the PR with retry logic applied to whole batches
could you please help review ?
cc @eapache

Copy link

@buyology buyology left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a first pass over the PR and left a few comments. The overall approach taken seems very reasonable to me! ☺️

@@ -67,6 +67,9 @@ type Client interface {
// in local cache. This function only works on Kafka 0.8.2 and higher.
RefreshCoordinator(consumerGroup string) error

// InitProducerID retrieves information required for Idempotent Producer
InitProducerID() (*InitProducerIDResponse, error)
Copy link

@buyology buyology Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to allow a Client to be reused to create multiple transactional producers? I'm thinking of the fact that the transactional producer would also use the InitProducerID-request but also specify the TransactionID and TransactionTimeout somewhere and I guess that would then have to be in the Config — and then I guess the answer would be — no

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We think a future implementation of the transactional producer shall change the implementation of InitProducerID() to get the TXID from the config.
Reusing a client with a TXID set in the config for creating a new producer would result in the old producer being fenced off as a zombie. Similar to the Java client if you were to reuse the config.

}
if c.Net.MaxOpenRequests > 1 {
return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java client provides some fallbacks for these automatically, but I guess this is more in line with how sarama does configuration in general

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we stuck to the sarama way of not rewriting the config under the covers

msg.retries++
}
// extremely pessimistic strategy - refreshing metadata for every batch retried. Should be improved
err := p.client.RefreshMetadata(topic)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess would basically be the InvalidMetadataExceptions?

Copy link
Member

@edoardocomar edoardocomar Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, can you expand on the comment? we're unsure what you are pointing out.

About this snippet we're not ecstatic about the very pessimistic strategy of refreshing md on each , we're going to push a small commit to make one refresh only

// Other non-retriable errors
default:
bp.parent.returnErrors(msgs, block.Err)
bp.parent.returnErrors(pSet.msgs, block.Err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java client compensates subsequent sequence numbers due to any non-retriable errors:

https://github.com/apache/kafka/blob/05ba5aa00847b18b74369a821e972bbba9f155eb/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L508

I guess we would have to do something similar.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, currently upon hitting a non retriable error, we do not attempt to recover automatically from it. Users will have to create a new Producer to get a new ProducerId.

As you pointed out, the Java client tries to recover automatically (by fetching a new producerId and even resequencing message in some cases) but we feel like this can be done as a further improvement/PR.

@buyology
Copy link

@birdayz — sorry for the belated reply: I uploaded my tx-branch. It is hacky in places, but I think there are e.g. some tests (and perhaps some details I figured out by dissecting the Java implementation) that can be of help.

@mimaison
Copy link
Contributor Author

@bai Can you take a look?

@bai
Copy link
Contributor

bai commented Oct 1, 2018

Thanks @mimaison, looks solid. Could you please rebase your branch to get it tested against Go 1.11 too?

@mimaison
Copy link
Contributor Author

mimaison commented Oct 1, 2018

Sure, I'll do that this week (hopefully today or tomorrow!)

mimaison and others added 3 commits October 1, 2018 17:20
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Unit test with mock broker

Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
@mimaison
Copy link
Contributor Author

mimaison commented Oct 1, 2018

@bai We rebased the PR. One of the Travis jobs failed while installing dependencies but all the other passed. I don't seem to be able to retrigger a build so I can rerun it.

@bai bai merged commit f21e149 into IBM:master Oct 3, 2018
@mimaison
Copy link
Contributor Author

mimaison commented Oct 3, 2018

Thank you!

@bai
Copy link
Contributor

bai commented Oct 3, 2018

Many thanks for contributing this 👍

muirdm pushed a commit to retailnext/sarama that referenced this pull request Oct 8, 2018
muirdm pushed a commit to retailnext/sarama that referenced this pull request Nov 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants