Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sarama does not correctly use s.SaramaConfig.Net.MaxOpenRequests #2052

Closed
baoxc-shopee opened this issue Oct 22, 2021 · 3 comments
Closed

sarama does not correctly use s.SaramaConfig.Net.MaxOpenRequests #2052

baoxc-shopee opened this issue Oct 22, 2021 · 3 comments
Labels
stale Issues and pull requests without any recent activity

Comments

@baoxc-shopee
Copy link

baoxc-shopee commented Oct 22, 2021

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
Version 1.30.0 (2021-09-29) 2.6 1.16
Configuration

What configuration values are you using for Sarama and Kafka?

        s.SaramaConfig = sarama.NewConfig()
	s.SaramaConfig.Version = sarama.V1_1_0_0
	s.SaramaConfig.Metadata.Timeout = time.Second * 30

	s.SaramaConfig.Producer.RequiredAcks = sarama.WaitForAll
	

	s.SaramaConfig.Producer.Return.Successes = true
	s.SaramaConfig.Producer.Return.Errors = true

	s.SaramaConfig.Producer.Retry.Max = 0 // disable retry, use application level retry
	s.SaramaConfig.Producer.Flush.Bytes = 1024 * 1024
	s.SaramaConfig.Producer.Flush.Frequency = time.Millisecond
	s.SaramaConfig.Net.MaxOpenRequests = 5


	s.SaramaConfig.Producer.Flush.MaxMessages = 10
	s.SaramaConfig.Producer.Flush.Messages = 10
	s.SaramaConfig.Producer.MaxMessageBytes = 10 * 1024 * 1024

	bytes := RandStringBytes(1024)
	go func() {
		for i:=0; i < 1000 * 80; i++ {
			s.Send(&Message{
				Meta: map[string]interface{}{
					"flag": strconv.Itoa(i),
				},
				Data: bytes,
			})
		}
	}()

	<- time.After(10 * time.Second)

	fmt.Println("qps", len(s.Done))
Problem Description

Like Java client, there should be a configuration like "max.in.flight.requests.per.connection" which is default 5 in Java client. However, in Sarama all the requests are strictly sent one by one:

in "async_producer.go" line 694:

	go withRecover(func() {
		for set := range bridge {
			request := set.buildRequest()

			response, err := broker.Produce(request)

			responses <- &brokerProducerResponse{
				set: set,
				err: err,
				res: response,
			}
		}
		close(responses)
	})

The "Produce" function finally uses "sendNoReceive" in "broker.go" line 853:

func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
	responseHeaderVersion := int16(-1)
	if res != nil {
		responseHeaderVersion = res.headerVersion()
	}

	promise, err := b.send(req, res != nil, responseHeaderVersion)
	if err != nil {
		return err
	}

	if promise == nil {
		return nil
	}

	select {
	case buf := <-promise.packets:
		return versionedDecode(buf, res, req.version())
	case err = <-promise.errors:
		return err
	}
}

This design makes that the sending is strictly one send, waiting one request, one send, waiting one request. When the network has some latency, this scheme greatly reduced the client throughput.

I would recommend that it would be better if multiple requests can be send concurrently.

@rtreffer-fita
Copy link

This was fixed in Version 1.31.0 (2022-01-18)
https://github.com/Shopify/sarama/blob/main/CHANGELOG.md#version-1310-2022-01-18

feat: allow AsyncProducer to have MaxOpenRequests inflight produce requests per broker by @xujianhai666 in #1686

@github-actions

This comment was marked as outdated.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Aug 19, 2023
@dnwe
Copy link
Collaborator

dnwe commented Aug 19, 2023

Closing as fixed in Version 1.31.0

@dnwe dnwe closed this as completed Aug 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale Issues and pull requests without any recent activity
Projects
None yet
Development

No branches or pull requests

3 participants