Skip to content

Commit

Permalink
Merge pull request #3 from Reflektion/RFKUI-1269
Browse files Browse the repository at this point in the history
Lock released before update
  • Loading branch information
lokesh-b authored Jun 21, 2018
2 parents 2c5819a + 26b96e4 commit 59b16ec
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pykafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .balancedconsumer import BalancedConsumer
from .managedbalancedconsumer import ManagedBalancedConsumer

__version__ = '2.5.0-rfk.1'
__version__ = '2.5.0-rfk.2'


__all__ = ["Broker", "SimpleConsumer", "Cluster", "Partition", "Producer",
Expand Down
14 changes: 7 additions & 7 deletions pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,20 +375,20 @@ def _produce(self, message):
"""
success = False
retry = 0
max_retries = self._max_retries * 5
while not success:
with self._update_lock:
leader_id = self._topic.partitions[message.partition_id].leader.id
if leader_id in self._owned_brokers:
self._owned_brokers[leader_id].enqueue(message)
success = True
if not success:
retry += 1
if retry < 10:
log.debug("Failed to enqueue produced message. Updating metdata.")
self._update()
else:
retry += 1
if retry % self._max_retries == 0:
log.debug("<RFK> Retries exceeded limit. Updating metadata again.")
self._update()
elif retry > max_retries:
raise KafkaException("Retries exceeded max limit")
raise ProduceFailureError("Message could not be enqueued due to missing broker "
"metadata for broker {}".format(leader_id))
success = False

def _send_request(self, message_batch, owned_broker):
Expand Down

0 comments on commit 59b16ec

Please sign in to comment.