Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer Send and SendAsyn is blocked for forever when pulsar is down #515

Open
megarajtm opened this issue May 6, 2021 · 4 comments
Open

Comments

@megarajtm
Copy link

megarajtm commented May 6, 2021

Producer Send and SendAsyn is blocked forever when pulsar is down if the MaxReconnectToBroker is set to unlimited retry. In case of pulsar down scenarios, within runEventsLoopin producer_partition.go, the call enters reconnectToBroker and remains in a forever loop until the pulsar broker connection is established. Due to this, no more events are consumed from eventsChan channel causing both Send and SendAsyn to be blocked. Due to this, the SendTimeout would also be not honoured.

Expected behavior

Producer Send must not be blocked forever when the pulsar broker is down. It must honour the SendTimeout and return back with an error.
Producer SendAsyn must never be blocked when the pulsar broker is down. It must honour the SendTimeout and call the callback function.

Actual behavior

Due the above mentioned issue Producer Send/SendAsyn blocks forever when the pulsar broker is down

Steps to reproduce

  1. Create a pulsar producer and set the MaxReconnectToBroker as unlimited retry and SendTimeout as a fixed value
  2. Send messages to pulsar using Send or SendAsyn API
  3. Bring down pulsar broker or inject a connection error between broker and client
  4. In case of Send, the call is blocker forever. In case of SendAsyn, the callback is never called and once the pendingQueue is filled, the call is blocked forever.

System configuration

Pulsar client version - v0.4.0

@megarajtm
Copy link
Author

megarajtm commented May 6, 2021

Proposed fix : In runEventsLoop, have a seperate go-routine working on connectClosedCh channel. This way eventsChan is never blocked.

func (p *partitionProducer) runEventsLoop() {
	go func() {
		for {
			select {
			case <-p.closeCh:
				return
			case <-p.connectClosedCh:
				p.log.Debug("runEventsLoop will reconnect")
				p.reconnectToBroker()
			}
		}
	}()
	for {
		select {
		case i := <-p.eventsChan:
			switch v := i.(type) {
			case *sendRequest:
				p.internalSend(v)
			case *flushRequest:
				p.internalFlush(v)
			case *closeProducer:
				p.internalClose(v)
				return
			}
		case <-p.batchFlushTicker.C:
			if p.batchBuilder.IsMultiBatches() {
				p.internalFlushCurrentBatches()
			} else {
				p.internalFlushCurrentBatch()
			}
		}
	}
}

@omnilight
Copy link
Contributor

This is a duplication of #496

@megarajtm
Copy link
Author

#496 Issue seems to be with the consumers. Also, does the proposed fix look good. If yes, I can create a PR for the same

megarajtm added a commit to megarajtm/pulsar-client-go that referenced this issue Jun 23, 2021
…down. Issue link - apache#515

Signed-off-by: Megaraj Mahadikar <megarajtm@gmail.com>
@hrsakai
Copy link
Contributor

hrsakai commented Dec 23, 2024

I had the same issue with v0.14.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants