Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues with SimpleConsumer when using auto commit #18

Closed
atinsood opened this issue May 27, 2013 · 15 comments
Closed

Issues with SimpleConsumer when using auto commit #18

atinsood opened this issue May 27, 2013 · 15 comments

Comments

@atinsood
Copy link

I was trying to use Simple Consumer to do auto commit, but ran into number of issues.

  • By default auto_commit is set to be false which is counter intuitive as the default value of auto commit is true in server config.
  • If consumer is created using

SimpleConsumer(kafka, "my-group", TOPIC_NAME, True)

then the auto commit does not work.

  • If consumer is created using

SimpleConsumer(kafka, "my-group", TOPIC_NAME, True, auto_commit_every_n=100)

then the code fails with unable to find reference for ReentrantTimer

  • If the code is changed to use

SimpleConsumer(kafka, "my-group", TOPIC_NAME, True, 100, 5000)

the code fails with unable to find reference to send_offset_commit_request which is defined incorrectly.

self.send_offset_commit_request(self.group, reqs)

  • I have enclosed the diff below that I had to do to ensure that

SimpleConsumer(kafka, "my-group", TOPIC_NAME, True, auto_commit_every_n=100)

can proceed further.

But now I am stuck at

kafka-python/kafka/util.py", line 45, in relative_unpack
    raise BufferUnderflowError("Not enough data left")
BufferUnderflowError: Not enough data left

Code diff to workaround issues mentioned above

diff --git a/kafka/consumer.py b/kafka/consumer.py
index f123113..7d658b1 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,6 +1,7 @@
 from itertools import izip_longest, repeat
 import logging
 from threading import Lock
+from util import ReentrantTimer

 from kafka.common import (
     ErrorMapping, FetchRequest,
@@ -126,7 +127,7 @@ class SimpleConsumer(object):
                     log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
                         offset, self.group, self.topic, partition))
                     reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
-            resps = self.send_offset_commit_request(self.group, reqs)
+            resps = self.client.send_offset_commit_request(self.group, reqs)
             for resp in resps:
                 assert resp.error == 0
             self.count_since_commit = 0
@mahendra
Copy link
Collaborator

I also noticed these problems with offset commit and offset fetch.

In consumer.py, I un-commented the lines for 0.8.1, which will fetch the offset information for the consumer. This resulted in an error in the broker.

If I enable auto-commit or do a manual commit with consumer.commit(), the broker crashes with an error.

Will update the exact error messages in a while.

@mahendra
Copy link
Collaborator

When I do consumer.commit() the following error is seen at the broker end.

[2013-05-27 13:53:40,587] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Unknown Source)
    at java.nio.HeapByteBuffer.getInt(Unknown Source)
    at kafka.api.PartitionStateInfo$.readFrom(LeaderAndIsrRequest.scala:56)
    at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:28)
    at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:25)
    at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
    at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265)
    at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:25)
    at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:39)
    at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:39)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:49)
    at kafka.network.Processor.read(SocketServer.scala:345)
    at kafka.network.Processor.run(SocketServer.scala:245)
    at java.lang.Thread.run(Unknown Source)

The python-kafka error is as follows

----> 1 consumer.commit()

kafka/consumer.py in commit(self, partitions)
--> 140             resps = self.client.send_offset_commit_request(self.group, reqs)

kafka/client.py in send_offset_commit_request(self, group, payloads, fail_on_error, callback)
    225         resps = self._send_broker_aware_request(payloads,
    226                                    partial(KafkaProtocol.encode_offset_commit_request, group=group),
--> 227                                    KafkaProtocol.decode_offset_commit_response)

kafka/client.py in _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn)
--> 136             conn.send(requestId, request)

kafka/conn.pyc in send(self, requestId, payload)
---> 78         self.data = self._consume_response()

kafka/conn.pyc in _consume_response(self)
     34         """
     35         data = ""
---> 36         for chunk in self._consume_response_iter():
     37             data += chunk
     38         return data

kafka/conn.pyc in _consume_response_iter(self)
     48         resp = self._sock.recv(4)
     49         if resp == "":
---> 50             raise Exception("Got no response from Kafka")
     51         (size,) = struct.unpack('>i', resp)
     52 

Exception: Got no response from Kafka

@mahendra
Copy link
Collaborator

@atinsood - after your fix, did auto-commit work? I am seeing an issue where the broker throws an exception.

@atinsood
Copy link
Author

@mahendra I haven't pulled in the changes that you made. Will try to fork your code. But after my changes the commit still does not work.. I am able to commit successfully without any exception but the consumer still seems to be pulling out all the messages.

I ended up referring to https://github.com/mumrah/kafka-python/blob/master/kafka/NOTES.md

and changed my code to

consumer = SimpleConsumer(kafka, "my-group", TOPIC_NAME)
    print('offset is {0}'.format(consumer.offsets))
    consumer.seek(10, 1)
    consumer.commit()
    for message in consumer:
        print('received message {0} '.format(message))

But the offset always seems to be offset is {0: 0} and now I am able to pull out messages from the offset that I specified 10 in this case to the end

@atinsood
Copy link
Author

Made a few more changes in the base code and had some more luck

(env)asood@starbuck-2 [~/work/opensource/env/swDependencies/kafka-python-mahendra]
> git status
# On branch master
# Changes not staged for commit:
#   (use "git add <file>..." to update what will be committed)
#   (use "git checkout -- <file>..." to discard changes in working directory)
#
#   modified:   kafka/consumer.py
#
# Untracked files:
#   (use "git add <file>..." to include in what will be committed)
#
#   kafka_python.egg-info/
no changes added to commit (use "git add" and/or "git commit -a")
(env)asood@starbuck-2 [~/work/opensource/env/swDependencies/kafka-python-mahendra]
> git diff
diff --git a/kafka/consumer.py b/kafka/consumer.py
index f123113..6b0fada 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -6,7 +6,7 @@ from kafka.common import (
     ErrorMapping, FetchRequest,
     OffsetRequest, OffsetFetchRequest, OffsetCommitRequest
 )
-
+from util import ReentrantTimer
 log = logging.getLogger("kafka")

 class SimpleConsumer(object):
@@ -27,7 +27,7 @@ class SimpleConsumer(object):
     manual call to commit will also reset these triggers

     """
-    def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None):
+    def __init__(self, client, group, topic, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000):
         self.client = client
         self.topic = topic
         self.group = group
@@ -57,14 +57,14 @@ class SimpleConsumer(object):

         # Uncomment for 0.8.1
         #
-        #for partition in self.client.topic_partitions[topic]:
-        #    req = OffsetFetchRequest(topic, partition)
-        #    (offset,) = self.client.send_offset_fetch_request(group, [req],
-        #                  callback=get_or_init_offset_callback, fail_on_error=False)
-        #    self.offsets[partition] = offset
-
         for partition in self.client.topic_partitions[topic]:
-            self.offsets[partition] = 0
+            req = OffsetFetchRequest(topic, partition)
+            (offset,) = self.client.send_offset_fetch_request(group, [req],
+                          callback=get_or_init_offset_callback, fail_on_error=False)
+            self.offsets[partition] = offset

So basically after uncommenting the code that's mentioned for 0.8.1 we need to comment the next for loop.

My consumer code looks like

def consumeMessages():
    print ("topic name {0} ".format(TOPIC_NAME))
    consumer = SimpleConsumer(kafka, "my-group", TOPIC_NAME)
    print('offset is {0}'.format(consumer.offsets))
    for message in consumer:
        print('received message {0} '.format(message))

So I am able to get messages now and the offset is correctly increasing

But I am seeing the following exception after every 48 messages for some reason :)

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 756, in run
    self.function(*self.args, **self.kwargs)
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/consumer.py",
    resps = self.client.send_offset_commit_request(self.group, reqs)
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/client.py", l
    KafkaProtocol.decode_offset_commit_response)
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/client.py", l
    for response in decoder_fn(response):
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/protocol.py",
    ((partition, error), cur) = relative_unpack('>ih', data, cur)
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/util.py", lin
    raise BufferUnderflowError("Not enough data left")
BufferUnderflowError: Not enough data left

ERROR:tornado.application:Uncaught exception POST /order (::1)
HTTPRequest(protocol='http', host='localhost:8888', method='POST', uri='/order', version='HTTP/1.1', remote_ip='::1', body='{"sub_number":"1","sub_orderID":"1
Traceback (most recent call last):
  File "/work/opensource/env/lib/python2.7/site-packages/tornado/web.py", line 1077,
    *self.path_args, **self.path_kwargs)
  File "/work/opensource/tornadoServer.py", line 33, in post
    kafkaHelper.consumeMessages()
  File "/work/opensource/kafkaHelper.py", line 23, in consumeMessages
    for message in consumer:
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/consumer.py",
    yield it.next()
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/consumer.py",
    (resp,) = self.client.send_fetch_request([req])
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/client.py", l
    KafkaProtocol.decode_fetch_response)
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/client.py", l
    conn.send(requestId, request)
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/conn.py", lin
    self.data = self._consume_response()
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/conn.py", lin
    for chunk in self._consume_response_iter():
  File "/work/opensource/env/swDependencies/kafka-python-mahendra/kafka/conn.py", lin
    resp = self._sock.recv(self.bufsize)

@mahendra
Copy link
Collaborator

@atinsood one quick question. which version of kafka are you running? I am following instructions here: https://cwiki.apache.org/KAFKA/kafka-08-quick-start.html

@atinsood
Copy link
Author

@mahendra Been using the master so far. But will look into the instructions mentioned by you and use that branch

@mahendra
Copy link
Collaborator

@atinsood Things are working fine for me now. Offset Commit and fetch. There was a bug in the timer logic for auto_commit. Have fixed that with a new pull request #23

@mahendra
Copy link
Collaborator

@atinsood I have a branch in my repo which has all the fixes. You can give it a try https://github.com/mahendra/kafka-python/tree/affirm

@mahendra
Copy link
Collaborator

@atinsood @mumrah I also noticed the buffer underflow errors. I think this is happening because of itertools not being thread safe. For eg: self._next_id() is using itertools.count(). When the commit thread runs, it is possible that the same correlation id is being used. I saw that some fetch request data was going to commit request decoder (and vice-versa). I averted this problem by protecting self._next_id() around a lock. Then I saw that this problem is happening in other calls.

I am looking into this issue. A temporary work-around is to use auto_commit without a auto-timer

consumer = SimpleConsumer( ...., auto_commit=True,
                          auto_commit_every_n = 20,
                          auto_commit_every_t = None)

This way, the commit code will run within the main thread. Once I did this, the errors disappeared.

We need to ensure that protocol/client code is thread safe.

@mahendra
Copy link
Collaborator

@atinsood @mumrah - Well, I just went through the code in detail and noticed that correlation id is not being used for multiplexing requests as of now. In that case, invoking auto_commit via a timer (thread) is inherently unsafe. We need to look at alternate approaches.

One very easy approach that I can think of is to use multiprocessing module instead of threading. The connection is then in a separate process space and commit can happen in-parallel. Still thinking on it. Will try it out and send a patch if it works well.

@mumrah
Copy link
Collaborator

mumrah commented May 28, 2013

Wow, a lot to catch up on from this weekend.

@mahendra, you're totally correct in that the library is not thread safe right now. I believe the only thing required to make it so is to implement the request multiplexing in conn.py. The request encoding/decoding should be thread safe since it is not using any instance variables during the decoding process (notice that everything in protocol.py is marked as @classmethod). I do have a note here about the thread safety, but perhaps it should be promoted to the README :)

If the commits are happening on a separate thread/process, there needs to be a way optionally synchronize when a manual commit happens. There are some use cases when you need to be sure that your offsets have been committed before proceeding.

@mahendra
Copy link
Collaborator

@mumrah I agree that implementing request multiplexing in conn.py is a better option than implementing it as a separate process. I have an early version working with running commit in a separate process (using shared memory). Will explore request multiplexing tomorrow and see how to do it.

@mahendra
Copy link
Collaborator

mahendra commented Jun 3, 2013

@mumrah Have sent a pull request with a simple approach for doing this - #29

@rdiomar
Copy link
Collaborator

rdiomar commented Jan 29, 2014

Cleaning up old issues. This seems resolved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants