diff --git a/go.mod b/go.mod index ff33e56d2..9e01887f7 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( cloud.google.com/go/pubsub v1.41.0 github.com/BurntSushi/toml v1.4.0 - github.com/IBM/sarama v1.43.2 + github.com/IBM/sarama v1.43.3 github.com/NYTimes/gziphandler v1.1.1 github.com/OneOfOne/go-utils v0.0.0-20180319162427-6019ff89a94e github.com/dgryski/go-expirecache v0.0.0-20170314133854-743ef98b2adb @@ -49,7 +49,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect - github.com/eapache/go-resiliency v1.6.0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect diff --git a/go.sum b/go.sum index e73a108d7..3542cc648 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ cloud.google.com/go/pubsub v1.41.0/go.mod h1:g+YzC6w/3N91tzG66e2BZtp7WrpBBMXVa3Y github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= -github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw= -github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ= +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/OneOfOne/go-utils v0.0.0-20180319162427-6019ff89a94e h1:Kzs/MKSycSiJUW63f+BddSnX+3C5r+7JbHBV0b2wp50= @@ -53,8 +53,8 @@ github.com/dgryski/go-trigram v0.0.0-20160407183937-79ec494e1ad0/go.mod h1:qzKC/ github.com/dgryski/httputil v0.0.0-20160116060654-189c2918cd08 h1:BGzXzhmOgLHlylvQ27Tcgz235JvonPEgdMtpaZaeZt0= github.com/dgryski/httputil v0.0.0-20160116060654-189c2918cd08/go.mod h1:FdR8QjYJOW8OhZGga6zhJxYW2zdtZIqe7to/I3DOnwg= github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= -github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= -github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= diff --git a/vendor/github.com/IBM/sarama/Dockerfile.kafka b/vendor/github.com/IBM/sarama/Dockerfile.kafka index ac2d47a16..40f5f333b 100644 --- a/vendor/github.com/IBM/sarama/Dockerfile.kafka +++ b/vendor/github.com/IBM/sarama/Dockerfile.kafka @@ -1,4 +1,4 @@ -FROM registry.access.redhat.com/ubi8/ubi-minimal:8.9@sha256:f30dbf77b075215f6c827c269c073b5e0973e5cea8dacdf7ecb6a19c868f37f2 +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.10@sha256:de2a0a20c1c3b39c3de829196de9694d09f97cd18fda1004de855ed2b4c841ba USER root diff --git a/vendor/github.com/IBM/sarama/async_producer.go b/vendor/github.com/IBM/sarama/async_producer.go index f629a6a2e..a6fa3d4a2 100644 --- a/vendor/github.com/IBM/sarama/async_producer.go +++ b/vendor/github.com/IBM/sarama/async_producer.go @@ -1101,7 +1101,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnSuccesses(pSet.msgs) // Retriable errors case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: if bp.parent.conf.Producer.Retry.Max <= 0 { bp.parent.abandonBrokerConnection(bp.broker) bp.parent.returnErrors(pSet.msgs, block.Err) @@ -1134,7 +1134,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo switch block.Err { case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", bp.broker.ID(), topic, partition, block.Err) if bp.currentRetries[topic] == nil { diff --git a/vendor/github.com/IBM/sarama/config.go b/vendor/github.com/IBM/sarama/config.go index facf76643..f2f197887 100644 --- a/vendor/github.com/IBM/sarama/config.go +++ b/vendor/github.com/IBM/sarama/config.go @@ -387,7 +387,7 @@ type Config struct { // default is 250ms, since 0 causes the consumer to spin when no events are // available. 100-500ms is a reasonable range for most cases. Kafka only // supports precision up to milliseconds; nanoseconds will be truncated. - // Equivalent to the JVM's `fetch.wait.max.ms`. + // Equivalent to the JVM's `fetch.max.wait.ms`. MaxWaitTime time.Duration // The maximum amount of time the consumer expects a message takes to diff --git a/vendor/github.com/IBM/sarama/docker-compose.yml b/vendor/github.com/IBM/sarama/docker-compose.yml index 55283cfe4..204768e32 100644 --- a/vendor/github.com/IBM/sarama/docker-compose.yml +++ b/vendor/github.com/IBM/sarama/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.9' services: zookeeper-1: hostname: 'zookeeper-1' diff --git a/vendor/github.com/IBM/sarama/offset_manager.go b/vendor/github.com/IBM/sarama/offset_manager.go index 1bf545908..294865127 100644 --- a/vendor/github.com/IBM/sarama/offset_manager.go +++ b/vendor/github.com/IBM/sarama/offset_manager.go @@ -251,18 +251,31 @@ func (om *offsetManager) Commit() { } func (om *offsetManager) flushToBroker() { + broker, err := om.coordinator() + if err != nil { + om.handleError(err) + return + } + + // Care needs to be taken to unlock this. Don't want to defer the unlock as this would + // cause the lock to be held while waiting for the broker to reply. + broker.lock.Lock() req := om.constructRequest() if req == nil { + broker.lock.Unlock() return } + resp, rp, err := sendOffsetCommit(broker, req) + broker.lock.Unlock() - broker, err := om.coordinator() if err != nil { om.handleError(err) + om.releaseCoordinator(broker) + _ = broker.Close() return } - resp, err := broker.CommitOffset(req) + err = handleResponsePromise(req, resp, rp, nil) if err != nil { om.handleError(err) om.releaseCoordinator(broker) @@ -270,9 +283,20 @@ func (om *offsetManager) flushToBroker() { return } + broker.handleThrottledResponse(resp) om.handleResponse(broker, req, resp) } +func sendOffsetCommit(coordinator *Broker, req *OffsetCommitRequest) (*OffsetCommitResponse, *responsePromise, error) { + resp := new(OffsetCommitResponse) + responseHeaderVersion := resp.headerVersion() + promise, err := coordinator.send(req, true, responseHeaderVersion) + if err != nil { + return nil, nil, err + } + return resp, promise, nil +} + func (om *offsetManager) constructRequest() *OffsetCommitRequest { r := &OffsetCommitRequest{ Version: 1, diff --git a/vendor/github.com/IBM/sarama/transaction_manager.go b/vendor/github.com/IBM/sarama/transaction_manager.go index ca7e13dab..bf20b75e9 100644 --- a/vendor/github.com/IBM/sarama/transaction_manager.go +++ b/vendor/github.com/IBM/sarama/transaction_manager.go @@ -466,7 +466,7 @@ func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, resultOffsets = failedTxn if len(resultOffsets) == 0 { - DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s %+v\n", + DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s\n", t.transactionalID, groupId) return resultOffsets, false, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index be4d7b6ea..d797090dc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -43,7 +43,7 @@ cloud.google.com/go/pubsub/pstest ## explicit; go 1.18 github.com/BurntSushi/toml github.com/BurntSushi/toml/internal -# github.com/IBM/sarama v1.43.2 +# github.com/IBM/sarama v1.43.3 ## explicit; go 1.19 github.com/IBM/sarama # github.com/NYTimes/gziphandler v1.1.1 @@ -73,7 +73,7 @@ github.com/dgryski/go-trigram # github.com/dgryski/httputil v0.0.0-20160116060654-189c2918cd08 ## explicit github.com/dgryski/httputil -# github.com/eapache/go-resiliency v1.6.0 +# github.com/eapache/go-resiliency v1.7.0 ## explicit; go 1.13 github.com/eapache/go-resiliency/breaker # github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3