diff --git a/async_producer.go b/async_producer.go index a4f614ff7..422a4d24a 100644 --- a/async_producer.go +++ b/async_producer.go @@ -85,7 +85,7 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er txnmgr.sequenceNumbers = make(map[string]int32) txnmgr.mutex = sync.Mutex{} - Logger.Printf("Obtained a ProducerId: %d epoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch) + Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch) } return txnmgr, nil @@ -100,8 +100,8 @@ type asyncProducer struct { input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup - brokers map[*Broker]chan<- *ProducerMessage - brokerRefs map[chan<- *ProducerMessage]int + brokers map[*Broker]*brokerProducer + brokerRefs map[*brokerProducer]int brokerLock sync.Mutex txnmgr *transactionManager @@ -142,8 +142,8 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) { input: make(chan *ProducerMessage), successes: make(chan *ProducerMessage), retries: make(chan *ProducerMessage), - brokers: make(map[*Broker]chan<- *ProducerMessage), - brokerRefs: make(map[chan<- *ProducerMessage]int), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), txnmgr: txnmgr, } @@ -381,9 +381,9 @@ func (tp *topicProducer) dispatch() { continue } } + // All messages being retried (sent or not) have already had their retry count updated if tp.parent.conf.Producer.Idempotent && msg.retries == 0 { msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition) - //Logger.Printf("Message %s for TP %s-%d got sequence number: %d\n", msg.Value, msg.Topic, msg.Partition, msg.sequenceNumber) } handler := tp.handlers[msg.Partition] @@ -451,9 +451,9 @@ type partitionProducer struct { partition int32 input <-chan *ProducerMessage - leader *Broker - breaker *breaker.Breaker - output chan<- *ProducerMessage + leader *Broker + breaker *breaker.Breaker + brokerProducer *brokerProducer // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through, // all other messages get buffered in retryState[msg.retries].buf to preserve ordering @@ -488,9 +488,9 @@ func (pp *partitionProducer) dispatch() { // on the first message pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition) if pp.leader != nil { - pp.output = pp.parent.getBrokerProducer(pp.leader) + pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader) pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight - pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} + pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} } for msg := range pp.input { @@ -522,7 +522,7 @@ func (pp *partitionProducer) dispatch() { // if we made it this far then the current msg contains real data, and can be sent to the next goroutine // without breaking any of our ordering guarantees - if pp.output == nil { + if pp.brokerProducer == nil { if err := pp.updateLeader(); err != nil { pp.parent.returnError(msg, err) time.Sleep(pp.parent.conf.Producer.Retry.Backoff) @@ -531,11 +531,11 @@ func (pp *partitionProducer) dispatch() { Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) } - pp.output <- msg + pp.brokerProducer.input <- msg } - if pp.output != nil { - pp.parent.unrefBrokerProducer(pp.leader, pp.output) + if pp.brokerProducer != nil { + pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer) } } @@ -547,12 +547,12 @@ func (pp *partitionProducer) newHighWatermark(hwm int) { // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages) pp.retryState[pp.highWatermark].expectChaser = true pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight - pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1} + pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1} // a new HWM means that our current broker selection is out of date Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID()) - pp.parent.unrefBrokerProducer(pp.leader, pp.output) - pp.output = nil + pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer) + pp.brokerProducer = nil } func (pp *partitionProducer) flushRetryBuffers() { @@ -560,7 +560,7 @@ func (pp *partitionProducer) flushRetryBuffers() { for { pp.highWatermark-- - if pp.output == nil { + if pp.brokerProducer == nil { if err := pp.updateLeader(); err != nil { pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err) goto flushDone @@ -569,7 +569,7 @@ func (pp *partitionProducer) flushRetryBuffers() { } for _, msg := range pp.retryState[pp.highWatermark].buf { - pp.output <- msg + pp.brokerProducer.input <- msg } flushDone: @@ -594,16 +594,16 @@ func (pp *partitionProducer) updateLeader() error { return err } - pp.output = pp.parent.getBrokerProducer(pp.leader) + pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader) pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight - pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} + pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} return nil }) } // one per broker; also constructs an associated flusher -func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { +func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { var ( input = make(chan *ProducerMessage) bridge = make(chan *produceSet) @@ -637,7 +637,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag close(responses) }) - return input + return bp } type brokerProducerResponse struct { @@ -652,7 +652,7 @@ type brokerProducer struct { parent *asyncProducer broker *Broker - input <-chan *ProducerMessage + input chan *ProducerMessage output chan<- *produceSet responses <-chan *brokerProducerResponse @@ -797,62 +797,105 @@ func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) { func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) { // we iterate through the blocks in the request set, not the response, so that we notice // if the response is missing a block completely - sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - bp.parent.returnSuccesses(msgs) + bp.parent.returnSuccesses(pSet.msgs) return } block := response.GetBlock(topic, partition) if block == nil { - bp.parent.returnErrors(msgs, ErrIncompleteResponse) + bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse) return } - fmt.Printf("response has error %v", block.Err) + switch block.Err { // Success case ErrNoError: if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() { - for _, msg := range msgs { + for _, msg := range pSet.msgs { msg.Timestamp = block.Timestamp } } - for i, msg := range msgs { + for i, msg := range pSet.msgs { msg.Offset = block.Offset + int64(i) } - bp.parent.returnSuccesses(msgs) + bp.parent.returnSuccesses(pSet.msgs) + // Duplicate + case ErrDuplicateSequenceNumber: + bp.parent.returnSuccesses(pSet.msgs) // Retriable errors case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", bp.broker.ID(), topic, partition, block.Err) + if bp.currentRetries[topic] == nil { + bp.currentRetries[topic] = make(map[int32]error) + } bp.currentRetries[topic][partition] = block.Err - bp.parent.retryMessages(msgs, block.Err) + // dropping the following messages has the side effect of incrementing their retry count bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err) + bp.parent.retryBatch(topic, partition, pSet, block.Err) + // Other non-retriable errors default: - bp.parent.returnErrors(msgs, block.Err) + bp.parent.returnErrors(pSet.msgs, block.Err) } }) } +func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) { + Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr) + produceSet := newProduceSet(p) + produceSet.msgs[topic] = make(map[int32]*partitionSet) + produceSet.msgs[topic][partition] = pSet + produceSet.bufferBytes += pSet.bufferBytes + produceSet.bufferCount += len(pSet.msgs) + for _, msg := range pSet.msgs { + if msg.retries >= p.conf.Producer.Retry.Max { + p.returnError(msg, kerr) + return + } + msg.retries++ + } + // extremely pessimistic strategy - refreshing metadata for every batch retried. Should be improved + err := p.client.RefreshMetadata(topic) + if err != nil { + Logger.Printf("Failed retrying batch for %v-%d because of %v while refreshing metadata\n", topic, partition, err) + for _, msg := range pSet.msgs { + p.returnError(msg, kerr) + } + return + } + leader, err := p.client.Leader(topic, partition) + if err != nil { + Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err) + for _, msg := range pSet.msgs { + p.returnError(msg, kerr) + } + return + } + bp := p.getBrokerProducer(leader) + bp.output <- produceSet +} + func (bp *brokerProducer) handleError(sent *produceSet, err error) { switch err.(type) { case PacketEncodingError: - sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - bp.parent.returnErrors(msgs, err) + sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { + bp.parent.returnErrors(pSet.msgs, err) }) default: Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) bp.parent.abandonBrokerConnection(bp.broker) _ = bp.broker.Close() bp.closing = err - sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - bp.parent.retryMessages(msgs, err) + sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { + bp.parent.retryMessages(pSet.msgs, err) }) - bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - bp.parent.retryMessages(msgs, err) + bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) { + bp.parent.retryMessages(pSet.msgs, err) }) bp.rollOver() } @@ -949,7 +992,7 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) { } } -func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage { +func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer { p.brokerLock.Lock() defer p.brokerLock.Unlock() @@ -966,13 +1009,13 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessag return bp } -func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) { +func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) { p.brokerLock.Lock() defer p.brokerLock.Unlock() p.brokerRefs[bp]-- if p.brokerRefs[bp] == 0 { - close(bp) + close(bp.input) delete(p.brokerRefs, bp) if p.brokers[broker] == bp { diff --git a/async_producer_test.go b/async_producer_test.go index 9f3ed9099..10e864c0d 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2,11 +2,9 @@ package sarama import ( "errors" - "fmt" "log" "os" "os/signal" - "strconv" "sync" "testing" "time" @@ -302,6 +300,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) { for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } + leader2.Returns(metadataLeader2) leader2.Returns(prodSuccess) expectResults(t, producer, 10, 0) @@ -461,6 +460,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { metadataLeader2 := new(MetadataResponse) metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataLeader2) leader2.Returns(prodNotLeader) seedBroker.Returns(metadataLeader1) @@ -468,6 +468,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { seedBroker.Returns(metadataLeader1) leader1.Returns(prodNotLeader) seedBroker.Returns(metadataLeader2) + seedBroker.Returns(metadataLeader2) prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) @@ -653,6 +654,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { // succeed this time expectResults(t, producer, 5, 0) + seedBroker.Returns(metadataResponse) // put five more through for i := 0; i < 5; i++ { @@ -755,7 +757,7 @@ func TestAsyncProducerNoReturns(t *testing.T) { leader.Close() } -func TestAsyncProducerIdempotent(t *testing.T) { +func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { broker := NewMockBroker(t, 1) clusterID := "cid" @@ -783,6 +785,7 @@ func TestAsyncProducerIdempotent(t *testing.T) { config.Producer.RequiredAcks = WaitForAll config.Producer.Retry.Backoff = 0 config.Producer.Idempotent = true + config.Net.MaxOpenRequests = 1 config.Version = V0_11_0_0 producer, err := NewAsyncProducer([]string{broker.Addr()}, config) if err != nil { @@ -805,68 +808,163 @@ func TestAsyncProducerIdempotent(t *testing.T) { closeProducer(t, producer) } -func TestAsyncProducerIdempotentRetry(t *testing.T) { - broker := NewMockBroker(t, 1) - - clusterID := "cid" - metadataResponse := &MetadataResponse{ - Version: 3, - ThrottleTimeMs: 0, - ClusterID: &clusterID, - ControllerID: 1, +func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { + //Logger = log.New(os.Stderr, "", log.LstdFlags) + tests := []struct { + name string + failAfterWrite bool + }{ + {"FailAfterWrite", true}, + {"FailBeforeWrite", false}, } - metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError) - broker.Returns(metadataResponse) - initProducerID := &InitProducerIDResponse{ - ThrottleTime: 0, - ProducerID: 1000, - ProducerEpoch: 1, - } - broker.Returns(initProducerID) + for _, test := range tests { + broker := NewMockBroker(t, 1) - config := NewConfig() - config.Producer.Flush.Messages = 10 - config.Producer.Return.Successes = true - config.Producer.Retry.Max = 4 - config.Producer.RequiredAcks = WaitForAll - config.Producer.Retry.Backoff = 0 - config.Producer.Idempotent = true - config.Version = V0_11_0_0 - producer, err := NewAsyncProducer([]string{broker.Addr()}, config) - if err != nil { - t.Fatal(err) - } + clusterID := "cid" + metadataResponse := &MetadataResponse{ + Version: 3, + ThrottleTimeMs: 0, + ClusterID: &clusterID, + ControllerID: 1, + } + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError) - for i := 0; i < 10; i++ { - producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} - } + initProducerIDResponse := &InitProducerIDResponse{ + ThrottleTime: 0, + ProducerID: 1000, + ProducerEpoch: 1, + } - prodNotLeader := &ProduceResponse{ - Version: 3, - ThrottleTime: 0, - } - prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas) - broker.Returns(prodNotLeader) + prodNotLeaderResponse := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas) - broker.Returns(metadataResponse) + prodDuplicate := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber) - prodSuccess := &ProduceResponse{ - Version: 3, - ThrottleTime: 0, - } - prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - broker.Returns(prodSuccess) - expectResults(t, producer, 10, 0) + prodOutOfSeq := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber) - broker.Close() - closeProducer(t, producer) + prodSuccessResponse := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError) + + prodCounter := 0 + lastBatchFirstSeq := -1 + lastBatchSize := -1 + lastSequenceWrittenToDisk := -1 + handlerFailBeforeWrite := func(req *request) (res encoder) { + switch req.body.key() { + case 3: + return metadataResponse + case 22: + return initProducerIDResponse + case 0: + prodCounter++ + + preq := req.body.(*ProduceRequest) + batch := preq.records["my_topic"][0].RecordBatch + batchFirstSeq := int(batch.FirstSequence) + batchSize := len(batch.Records) + + if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append + + if lastBatchFirstSeq == batchFirstSeq { //is a batch retry + if lastBatchSize == batchSize { //good retry + // mock write to disk + lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1 + return prodSuccessResponse + } + t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize) + return prodOutOfSeq + } else { // not a retry + // save batch just received for future check + lastBatchFirstSeq = batchFirstSeq + lastBatchSize = batchSize + + if prodCounter%2 == 1 { + if test.failAfterWrite { + // mock write to disk + lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1 + } + return prodNotLeaderResponse + } + // mock write to disk + lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1 + return prodSuccessResponse + } + } else { + if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry + if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages + return prodDuplicate + } + // mock write to disk + lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1 + return prodSuccessResponse + } else { //out of sequence / bad retried batch + if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize { + t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize) + } else if lastSequenceWrittenToDisk+1 != batchFirstSeq { + t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq) + } else { + t.Errorf("[%s] Unexpected error", test.name) + } + + return prodOutOfSeq + } + } + + } + return nil + } + + config := NewConfig() + config.Version = V0_11_0_0 + config.Producer.Idempotent = true + config.Net.MaxOpenRequests = 1 + config.Producer.RequiredAcks = WaitForAll + config.Producer.Return.Successes = true + config.Producer.Flush.Frequency = 50 * time.Millisecond + config.Producer.Retry.Backoff = 100 * time.Millisecond + + broker.setHandler(handlerFailBeforeWrite) + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + go func() { + for i := 0; i < 7; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")} + time.Sleep(100 * time.Millisecond) + } + }() + + expectResults(t, producer, 10, 0) + + broker.Close() + closeProducer(t, producer) + } } -func TestAsyncProducerIdempotentRetryBatch(t *testing.T) { - Logger = log.New(os.Stderr, "", log.LstdFlags) - /*broker := NewMockBroker(t, 1) +func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { + broker := NewMockBroker(t, 1) clusterID := "cid" metadataResponse := &MetadataResponse{ @@ -885,52 +983,36 @@ func TestAsyncProducerIdempotentRetryBatch(t *testing.T) { ProducerEpoch: 1, } broker.Returns(initProducerID) - */ + config := NewConfig() - config.Producer.Flush.Messages = 3 + config.Producer.Flush.Messages = 10 config.Producer.Return.Successes = true - config.Producer.Retry.Max = 4 + config.Producer.Retry.Max = 400000 config.Producer.RequiredAcks = WaitForAll - config.Producer.Retry.Backoff = 100 * time.Millisecond + config.Producer.Retry.Backoff = 0 config.Producer.Idempotent = true + config.Net.MaxOpenRequests = 1 config.Version = V0_11_0_0 - producer, err := NewAsyncProducer([]string{"localhost:9092"}, config) + + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) if err != nil { t.Fatal(err) } - for i := 0; i < 3; i++ { - producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage + strconv.Itoa(i))} + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } - /*prodNotLeader := &ProduceResponse{ + + prodOutOfSeq := &ProduceResponse{ Version: 3, ThrottleTime: 0, } - prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas) - broker.Returns(prodNotLeader) - */ - go func() { - for i := 0; i < 6; i++ { - producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine" + strconv.Itoa(i))} - time.Sleep(100 * time.Millisecond) - } - }() - /* - broker.Returns(metadataResponse) - - prodSuccess := &ProduceResponse{ - Version: 3, - ThrottleTime: 0, - } - prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - broker.Returns(prodSuccess)*/ - expectResults(t, producer, 9, 0) + prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber) + broker.Returns(prodOutOfSeq) + expectResults(t, producer, 0, 10) - fmt.Printf("**** Closing Broker \n") - //broker.Close() - fmt.Printf("**** Closing producer \n") + broker.Close() closeProducer(t, producer) - fmt.Printf("**** Closed producer \n") } // This example shows how to use the producer while simultaneously diff --git a/client.go b/client.go index 2cb313cc0..f7497deb0 100644 --- a/client.go +++ b/client.go @@ -198,7 +198,7 @@ func (client *client) InitProducerID() (*InitProducerIDResponse, error) { return response, nil default: // some error, remove that broker and try again - Logger.Printf("Error is %v", err) + Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err) _ = broker.Close() client.deregisterBroker(broker) } @@ -746,7 +746,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) return err default: // some other error, remove that broker and try again - Logger.Println("client/metadata got error from broker while fetching metadata:", err) + Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err) _ = broker.Close() client.deregisterBroker(broker) } diff --git a/config.go b/config.go index 1feab4f3b..e0c6dfb36 100644 --- a/config.go +++ b/config.go @@ -467,8 +467,8 @@ func (c *Config) Validate() error { if c.Producer.RequiredAcks != WaitForAll { return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll") } - if c.Net.MaxOpenRequests > 5 { - return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests <= 5") + if c.Net.MaxOpenRequests > 1 { + return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1") } } diff --git a/config_test.go b/config_test.go index 6c0bd0916..a19bdc471 100644 --- a/config_test.go +++ b/config_test.go @@ -231,9 +231,8 @@ func TestProducerConfigValidates(t *testing.T) { cfg.Version = V0_11_0_0 cfg.Producer.Idempotent = true cfg.Producer.RequiredAcks = WaitForAll - cfg.Net.MaxOpenRequests = 6 }, - "Idempotent producer requires Net.MaxOpenRequests <= 5"}, + "Idempotent producer requires Net.MaxOpenRequests to be 1"}, } for i, test := range tests { diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 851f5d7c0..aa8eccf7c 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -161,6 +161,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi prodCfg.Producer.Idempotent = idempotent if idempotent { prodCfg.Producer.RequiredAcks = WaitForAll + prodCfg.Net.MaxOpenRequests = 1 } p, err := NewSyncProducer(kafkaBrokers, prodCfg) diff --git a/produce_set.go b/produce_set.go index dc71e2f1c..219ec5f26 100644 --- a/produce_set.go +++ b/produce_set.go @@ -3,7 +3,6 @@ package sarama import ( "encoding/binary" "errors" - "fmt" "time" ) @@ -60,7 +59,6 @@ func (ps *produceSet) add(msg *ProducerMessage) error { set := partitions[msg.Partition] if set == nil { if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { - fmt.Printf("Creating a new batch for partition %s-%d with base sequence %d \n", msg.Topic, msg.Partition, msg.sequenceNumber) batch := &RecordBatch{ FirstTimestamp: timestamp, Version: 2, @@ -79,7 +77,6 @@ func (ps *produceSet) add(msg *ProducerMessage) error { } partitions[msg.Partition] = set } - fmt.Printf("Adding message with sequence %d to batch for partition %s-%d value: %v\n", msg.sequenceNumber, msg.Topic, msg.Partition, msg.Value) set.msgs = append(set.msgs, msg) if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { @@ -148,7 +145,6 @@ func (ps *produceSet) buildRequest() *ProduceRequest { record.OffsetDelta = int64(i) } } - fmt.Printf("Add batch to ProduceRequest for TP %s-%d with firstSeq %d, size: %d\n", topic, partition, rb.FirstSequence, len(rb.Records)) req.AddBatch(topic, partition, rb) continue } @@ -194,10 +190,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest { return req } -func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs []*ProducerMessage)) { +func (ps *produceSet) eachPartition(cb func(topic string, partition int32, pSet *partitionSet)) { for topic, partitionSet := range ps.msgs { for partition, set := range partitionSet { - cb(topic, partition, set.msgs) + cb(topic, partition, set) } } } diff --git a/produce_set_test.go b/produce_set_test.go index 5e83bc1c8..51d4cef30 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -75,8 +75,8 @@ func TestProduceSetPartitionTracking(t *testing.T) { seenT1P1 := false seenT2P0 := false - ps.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - if len(msgs) != 1 { + ps.eachPartition(func(topic string, partition int32, pSet *partitionSet) { + if len(pSet.msgs) != 1 { t.Error("Wrong message count") }