Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Idempotent Producer #1152

Merged
merged 3 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 161 additions & 47 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,50 @@ type AsyncProducer interface {
Errors() <-chan *ProducerError
}

// transactionManager keeps the state necessary to ensure idempotent production
type transactionManager struct {
producerID int64
producerEpoch int16
sequenceNumbers map[string]int32
mutex sync.Mutex
}

const (
noProducerID = -1
noProducerEpoch = -1
)

func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
return sequence
}

func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
txnmgr := &transactionManager{
producerID: noProducerID,
producerEpoch: noProducerEpoch,
}

if conf.Producer.Idempotent {
initProducerIDResponse, err := client.InitProducerID()
if err != nil {
return nil, err
}
txnmgr.producerID = initProducerIDResponse.ProducerID
txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
txnmgr.sequenceNumbers = make(map[string]int32)
txnmgr.mutex = sync.Mutex{}

Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
}

return txnmgr, nil
}

type asyncProducer struct {
client Client
conf *Config
Expand All @@ -56,9 +100,11 @@ type asyncProducer struct {
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup

brokers map[*Broker]chan<- *ProducerMessage
brokerRefs map[chan<- *ProducerMessage]int
brokers map[*Broker]*brokerProducer
brokerRefs map[*brokerProducer]int
brokerLock sync.Mutex

txnmgr *transactionManager
}

// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
Expand All @@ -84,15 +130,21 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
return nil, ErrClosedClient
}

txnmgr, err := newTransactionManager(client.Config(), client)
if err != nil {
return nil, err
}

p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]chan<- *ProducerMessage),
brokerRefs: make(map[chan<- *ProducerMessage]int),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
}

// launch our singleton dispatchers
Expand Down Expand Up @@ -145,9 +197,10 @@ type ProducerMessage struct {
// least version 0.10.0.
Timestamp time.Time

retries int
flags flagSet
expectation chan *ProducerError
retries int
flags flagSet
expectation chan *ProducerError
sequenceNumber int32
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand Down Expand Up @@ -328,6 +381,10 @@ func (tp *topicProducer) dispatch() {
continue
}
}
// All messages being retried (sent or not) have already had their retry count updated
if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
}

handler := tp.handlers[msg.Partition]
if handler == nil {
Expand Down Expand Up @@ -394,9 +451,9 @@ type partitionProducer struct {
partition int32
input <-chan *ProducerMessage

leader *Broker
breaker *breaker.Breaker
output chan<- *ProducerMessage
leader *Broker
breaker *breaker.Breaker
brokerProducer *brokerProducer

// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
Expand Down Expand Up @@ -431,9 +488,9 @@ func (pp *partitionProducer) dispatch() {
// on the first message
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
if pp.leader != nil {
pp.output = pp.parent.getBrokerProducer(pp.leader)
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}

for msg := range pp.input {
Expand Down Expand Up @@ -465,7 +522,7 @@ func (pp *partitionProducer) dispatch() {
// if we made it this far then the current msg contains real data, and can be sent to the next goroutine
// without breaking any of our ordering guarantees

if pp.output == nil {
if pp.brokerProducer == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnError(msg, err)
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
Expand All @@ -474,11 +531,11 @@ func (pp *partitionProducer) dispatch() {
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}

pp.output <- msg
pp.brokerProducer.input <- msg
}

if pp.output != nil {
pp.parent.unrefBrokerProducer(pp.leader, pp.output)
if pp.brokerProducer != nil {
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
}
}

Expand All @@ -490,20 +547,20 @@ func (pp *partitionProducer) newHighWatermark(hwm int) {
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
pp.retryState[pp.highWatermark].expectChaser = true
pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}

// a new HWM means that our current broker selection is out of date
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.parent.unrefBrokerProducer(pp.leader, pp.output)
pp.output = nil
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
pp.brokerProducer = nil
}

func (pp *partitionProducer) flushRetryBuffers() {
Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
for {
pp.highWatermark--

if pp.output == nil {
if pp.brokerProducer == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
goto flushDone
Expand All @@ -512,7 +569,7 @@ func (pp *partitionProducer) flushRetryBuffers() {
}

for _, msg := range pp.retryState[pp.highWatermark].buf {
pp.output <- msg
pp.brokerProducer.input <- msg
}

flushDone:
Expand All @@ -537,16 +594,16 @@ func (pp *partitionProducer) updateLeader() error {
return err
}

pp.output = pp.parent.getBrokerProducer(pp.leader)
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}

return nil
})
}

// one per broker; also constructs an associated flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
Expand Down Expand Up @@ -580,7 +637,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
close(responses)
})

return input
return bp
}

type brokerProducerResponse struct {
Expand All @@ -595,7 +652,7 @@ type brokerProducer struct {
parent *asyncProducer
broker *Broker

input <-chan *ProducerMessage
input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse

Expand Down Expand Up @@ -740,62 +797,119 @@ func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
// we iterate through the blocks in the request set, not the response, so that we notice
// if the response is missing a block completely
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
var retryTopics []string
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
bp.parent.returnSuccesses(msgs)
bp.parent.returnSuccesses(pSet.msgs)
return
}

block := response.GetBlock(topic, partition)
if block == nil {
bp.parent.returnErrors(msgs, ErrIncompleteResponse)
bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
return
}

switch block.Err {
// Success
case ErrNoError:
if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
for _, msg := range msgs {
for _, msg := range pSet.msgs {
msg.Timestamp = block.Timestamp
}
}
for i, msg := range msgs {
for i, msg := range pSet.msgs {
msg.Offset = block.Offset + int64(i)
}
bp.parent.returnSuccesses(msgs)
bp.parent.returnSuccesses(pSet.msgs)
// Duplicate
case ErrDuplicateSequenceNumber:
bp.parent.returnSuccesses(pSet.msgs)
// Retriable errors
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
bp.currentRetries[topic][partition] = block.Err
bp.parent.retryMessages(msgs, block.Err)
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
retryTopics = append(retryTopics, topic)
// Other non-retriable errors
default:
bp.parent.returnErrors(msgs, block.Err)
bp.parent.returnErrors(pSet.msgs, block.Err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java client compensates subsequent sequence numbers due to any non-retriable errors:

https://github.com/apache/kafka/blob/05ba5aa00847b18b74369a821e972bbba9f155eb/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L508

I guess we would have to do something similar.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, currently upon hitting a non retriable error, we do not attempt to recover automatically from it. Users will have to create a new Producer to get a new ProducerId.

As you pointed out, the Java client tries to recover automatically (by fetching a new producerId and even resequencing message in some cases) but we feel like this can be done as a further improvement/PR.

}
})

if len(retryTopics) > 0 {
err := bp.parent.client.RefreshMetadata(retryTopics...)
if err != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", err)
}

sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
block := response.GetBlock(topic, partition)
if block == nil {
// handled in the previous "eachPartition" loop
return
}

switch block.Err {
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
if bp.currentRetries[topic] == nil {
bp.currentRetries[topic] = make(map[int32]error)
}
bp.currentRetries[topic][partition] = block.Err
// dropping the following messages has the side effect of incrementing their retry count
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
bp.parent.retryBatch(topic, partition, pSet, block.Err)
}
})
}
}

func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
produceSet := newProduceSet(p)
produceSet.msgs[topic] = make(map[int32]*partitionSet)
produceSet.msgs[topic][partition] = pSet
produceSet.bufferBytes += pSet.bufferBytes
produceSet.bufferCount += len(pSet.msgs)
for _, msg := range pSet.msgs {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, kerr)
return
}
msg.retries++
}

// it's expected that a metadata refresh has been requested prior to calling retryBatch
leader, err := p.client.Leader(topic, partition)
if err != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
for _, msg := range pSet.msgs {
p.returnError(msg, kerr)
}
return
}
bp := p.getBrokerProducer(leader)
bp.output <- produceSet
}

func (bp *brokerProducer) handleError(sent *produceSet, err error) {
switch err.(type) {
case PacketEncodingError:
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
bp.parent.returnErrors(msgs, err)
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.returnErrors(pSet.msgs, err)
})
default:
Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
bp.parent.abandonBrokerConnection(bp.broker)
_ = bp.broker.Close()
bp.closing = err
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
bp.parent.retryMessages(msgs, err)
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
})
bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
bp.parent.retryMessages(msgs, err)
bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
})
bp.rollOver()
}
Expand Down Expand Up @@ -892,7 +1006,7 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
}
}

func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

Expand All @@ -909,13 +1023,13 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessag
return bp
}

func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

p.brokerRefs[bp]--
if p.brokerRefs[bp] == 0 {
close(bp)
close(bp.input)
delete(p.brokerRefs, bp)

if p.brokers[broker] == bp {
Expand Down
Loading