Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "broker received out of order sequence" when brokers die #1661

Merged
merged 2 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,28 @@ const (
noProducerEpoch = -1
)

func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
return sequence
return sequence, t.producerEpoch
}

func (t *transactionManager) bumpEpoch() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.producerEpoch++
for k := range t.sequenceNumbers {
t.sequenceNumbers[k] = 0
}
}

func (t *transactionManager) getProducerID() (int64, int16) {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.producerID, t.producerEpoch
}

func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
Expand Down Expand Up @@ -208,6 +223,8 @@ type ProducerMessage struct {
flags flagSet
expectation chan *ProducerError
sequenceNumber int32
producerEpoch int16
hasSequence bool
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand All @@ -234,6 +251,9 @@ func (m *ProducerMessage) byteSize(version int) int {
func (m *ProducerMessage) clear() {
m.flags = 0
m.retries = 0
m.sequenceNumber = 0
m.producerEpoch = 0
m.hasSequence = false
}

// ProducerError is the type of error generated when the producer fails to deliver a message.
Expand Down Expand Up @@ -388,10 +408,6 @@ func (tp *topicProducer) dispatch() {
continue
}
}
// All messages being retried (sent or not) have already had their retry count updated
if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
}

handler := tp.handlers[msg.Partition]
if handler == nil {
Expand Down Expand Up @@ -570,6 +586,15 @@ func (pp *partitionProducer) dispatch() {
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}

// Now that we know we have a broker to actually try and send this message to, generate the sequence
// number for it.
// All messages being retried (sent or not) have already had their retry count updated
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}

pp.brokerProducer.input <- msg
}
}
Expand Down Expand Up @@ -748,12 +773,21 @@ func (bp *brokerProducer) run() {
}

if bp.buffer.wouldOverflow(msg) {
if err := bp.waitForSpace(msg); err != nil {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}

if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
// The epoch was reset, need to roll the buffer over
Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
if err := bp.waitForSpace(msg, true); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
Expand Down Expand Up @@ -809,17 +843,15 @@ func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
return bp.currentRetries[msg.Topic][msg.Partition]
}

func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())

func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
for {
select {
case response := <-bp.responses:
bp.handleResponse(response)
// handling a response can change our state, so re-check some things
if reason := bp.needsRetry(msg); reason != nil {
return reason
} else if !bp.buffer.wouldOverflow(msg) {
} else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
return nil
}
case bp.output <- bp.buffer:
Expand Down Expand Up @@ -1030,6 +1062,12 @@ func (p *asyncProducer) shutdown() {
}

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
// will never see a message with this number, so we can never continue the sequence.
if msg.hasSequence {
Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
p.txnmgr.bumpEpoch()
}
msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.Return.Errors {
Expand Down
69 changes: 69 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,75 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
closeProducer(t, producer)
}

func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

metadataResponse := &MetadataResponse{
Version: 1,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataResponse)

initProducerID := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: 1000,
ProducerEpoch: 1,
}
broker.Returns(initProducerID)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Flush.Frequency = 10 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
config.Producer.RequiredAcks = WaitForAll
config.Producer.Retry.Backoff = 0
config.Producer.Idempotent = true
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer closeProducer(t, producer)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
prodError := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
broker.Returns(prodError)
<-producer.Errors()

lastReqRes := broker.history[len(broker.history)-1]
lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.FirstSequence != 0 {
t.Error("first sequence not zero")
}
if lastProduceBatch.ProducerEpoch != 1 {
t.Error("first epoch was not one")
}

// Now if we produce again, the epoch should have rolled over.
producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
broker.Returns(prodError)
<-producer.Errors()

lastReqRes = broker.history[len(broker.history)-1]
lastProduceBatch = lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.FirstSequence != 0 {
t.Error("second sequence not zero")
}
if lastProduceBatch.ProducerEpoch <= 1 {
t.Error("second epoch was not > 1")
}
}

// TestBrokerProducerShutdown ensures that a call to shutdown stops the
// brokerProducer run() loop and doesn't leak any goroutines
func TestBrokerProducerShutdown(t *testing.T) {
Expand Down
78 changes: 78 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -96,6 +97,83 @@ func TestFuncProducingToInvalidTopic(t *testing.T) {
safeClose(t, producer)
}

func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewConfig()
config.Producer.Flush.Frequency = 250 * time.Millisecond
config.Producer.Idempotent = true
config.Producer.Timeout = 500 * time.Millisecond
config.Producer.Retry.Max = 1
config.Producer.Retry.Backoff = 500 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewSyncProducer(kafkaBrokers, config)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, producer)

// Successfully publish a few messages
for i := 0; i < 10; i++ {
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("%d message", i)),
})
if err != nil {
t.Fatal(err)
}
}

// break the brokers.
for proxyName, proxy := range Proxies {
if !strings.Contains(proxyName, "kafka") {
continue
}
if err := proxy.Disable(); err != nil {
t.Fatal(err)
}
}

// This should fail hard now
for i := 10; i < 20; i++ {
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("%d message", i)),
})
if err == nil {
t.Fatal(err)
}
}

// Now bring the proxy back up
for proxyName, proxy := range Proxies {
if !strings.Contains(proxyName, "kafka") {
continue
}
if err := proxy.Enable(); err != nil {
t.Fatal(err)
}
}

// We should be able to publish again (once everything calms down)
// (otherwise it times out)
for {
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "test.1",
Value: StringEncoder("comeback message"),
})
if err == nil {
break
}
}
}

func testProducingMessages(t *testing.T, config *Config) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
Expand Down
24 changes: 17 additions & 7 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ type partitionSet struct {
}

type produceSet struct {
parent *asyncProducer
msgs map[string]map[int32]*partitionSet
parent *asyncProducer
msgs map[string]map[int32]*partitionSet
producerID int64
producerEpoch int16

bufferBytes int
bufferCount int
}

func newProduceSet(parent *asyncProducer) *produceSet {
pid, epoch := parent.txnmgr.getProducerID()
return &produceSet{
msgs: make(map[string]map[int32]*partitionSet),
parent: parent,
msgs: make(map[string]map[int32]*partitionSet),
parent: parent,
producerID: pid,
producerEpoch: epoch,
}
}

Expand Down Expand Up @@ -65,8 +70,8 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
ProducerID: ps.parent.txnmgr.producerID,
ProducerEpoch: ps.parent.txnmgr.producerEpoch,
ProducerID: ps.producerID,
ProducerEpoch: ps.producerEpoch,
}
if ps.parent.conf.Producer.Idempotent {
batch.FirstSequence = msg.sequenceNumber
Expand All @@ -78,12 +83,17 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
}
partitions[msg.Partition] = set
}
set.msgs = append(set.msgs, msg)

if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
return errors.New("assertion failed: message out of sequence added to a batch")
}
}

// Past this point we can't return an error, because we've already added the message to the set.
set.msgs = append(set.msgs, msg)

if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
// We are being conservative here to avoid having to prep encode the record
size += maximumRecordOverhead
rec := &Record{
Expand Down