Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [client] fix same producer/consumer use more than one connection per broker #1323

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

shibd
Copy link
Member

@shibd shibd commented Dec 27, 2024

Motivation

This is a catch up for apache/pulsar#21144

When a producer or consumer reconnects, a random number will be generated as the key suffix in ConnectionPool to create or get the Connection object from the pool.

idx := cnt % p.maxConnectionsPerHost
return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx)

Modifications

  • Change GetConnection() of connection_pool and add params: keySuffix
  • Generate once keySuffix for consumer/producer when create it.
  • When reconnecting, use this keySuffix to ensure obtaining the same connection

Verifying this change

  • Add TestSelectConnectionForSameConsumer and TestSelectConnectionForSameProducer to cover this.

@shibd shibd added this to the v0.15.0 milestone Dec 29, 2024
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the ACK requests still use pc._getConn() and RequestOnCnx or RequestOnCnxNoWait in some places, should they use RequestWithCnxKeySuffix as well?

Or at least should we call _setConn(nil) when the consumer unregisters itself from the connection? For example, https://github.com/apache/pulsar/blob/5a3a1f169a7f90181bd5c213c8e9f479bc74f0f2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L194

@shibd
Copy link
Member Author

shibd commented Dec 30, 2024

I see the ACK requests still use pc._getConn() and RequestOnCnx or RequestOnCnxNoWait in some places, should they use RequestWithCnxKeySuffix as well?

No need, in grabConn will reset conx

@shibd shibd requested a review from BewareMyPower December 30, 2024 12:21
pulsar/consumer_test.go Outdated Show resolved Hide resolved
pulsar/consumer_test.go Outdated Show resolved Hide resolved
pulsar/internal/connection_pool.go Outdated Show resolved Hide resolved
Comment on lines 149 to 155
func (p *connectionPool) GenerateRoundRobinIndex() int32 {
cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
if cnt < 0 {
cnt = -cnt
}
return cnt % p.maxConnectionsPerHost
}
Copy link
Member

@RobertIndie RobertIndie Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cnt = -cnt is incorrect. I think you're trying to handle the case where uint32 overflow results in a negative number. However, if cnt overflows to math.MinInt32, negating it will still result in a negative number.

Could you make roundRobinCnt as uint32, so that we can simplify this method. as the following:

Suggested change
func (p *connectionPool) GenerateRoundRobinIndex() int32 {
cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
if cnt < 0 {
cnt = -cnt
}
return cnt % p.maxConnectionsPerHost
}
func (p *connectionPool) GenerateRoundRobinIndex() int32 {
return atomic.AddUint32(&p.roundRobinCnt, 1) % p.maxConnectionsPerHost
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why not just generate a random integer?

Copy link
Member Author

@shibd shibd Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make roundRobinCnt as uint32, so that we can simplify this method. as the following:

Done

I'm wondering why not just generate a random integer?

Here just keep original logic:

func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr *url.URL) string {
cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
if cnt < 0 {
cnt = -cnt
}
idx := cnt % p.maxConnectionsPerHost
return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx)
}

@BewareMyPower
Copy link
Contributor

Yes, the consumer's connection is called after the Subscribe RPC is done.

When a partition consumer (pc) receives a CLOSE_CONSUMER command,

  1. In connection.handleCloseConsumer, it calls consumer.ConnectionClosed(closeConsumer), which sends a connectionClosed event to pc.connectCloseCh.
  2. In pc.runEventsLoop, reconnectToBroker is called, all requests sent via pc._getConn() will wait until this method is done.

What I have concern is, before the reconnectToBroker is done, the pc._getConn() will return a non-nil outdated connection. If _getConn() is not called in runEventsLoop(), it might return an outdated connection.

There is another possible case that internalXxx could be called before reconnectToBroker is called, i.e. pc.eventsCh receives an event before pc.connectClosedCh.

Ideally, I think the best solution is to call pc._setConn(nil) before sending the connectionClosed event in ConnectionClosed and check if _getConn is nil in RequestOnCnx or RequestOnCnxNoWait.

shibd and others added 3 commits December 30, 2024 21:35
@shibd shibd marked this pull request as draft December 30, 2024 14:26
@shibd
Copy link
Member Author

shibd commented Dec 31, 2024

hi, @BewareMyPower

  1. Even if we set the connection to null when receive cmd ConsumerClosed, we cannot completely guarantee that other API calls won't use the old connection to send to the old broker. Setting the connection to null and performing checks can help mitigate this issue to some extent.
  2. If the connection is null when processing API requests (e.g., getLastMessageId), we should have an internal retry mechanism instead of throwing an exception to the client, as connection reconnection should be transparent to the caller.

In this PR, I will throw an error when the connection is null. In a future PR, I will implement the retry logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants