Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for async producer #33

Merged
merged 14 commits into from
Jul 11, 2013
Merged

Support for async producer #33

merged 14 commits into from
Jul 11, 2013

Conversation

mahendra
Copy link
Collaborator

The Java/Scala Kafka client supports a mechanism for sending messages asynchronously by using a queue and a thread. Messages are put on the queue and the worker thread keeps sending it to the broker.

This ticket implements this feature in python

We use multiprocessing instead of threads to send the messages

The Java/Scala Kafka client supports a mechanism for sending
messages asynchronously by using a queue and a thread.
Messages are put on the queue and the worker thread keeps sending
it to the broker.

This ticket implements this feature in python
We use multiprocessing instead of threads to send the messages
@mahendra
Copy link
Collaborator Author

@mumrah If you approve this, I will update #32 to provide async support later.

Conflicts:
	kafka/producer.py
@mumrah
Copy link
Collaborator

mumrah commented Jun 13, 2013

@mahendra what do you think about a combo consumer/producer Queue? This is how I did it in the 0.7x version of the library. This is what I had in mind as a solution to the whole async/multiprocess question

  • If you want blocking producer, use Event
  • Non-blocking producing, just let it enqueue and forget about it
  • Blocking consuming, use Queue.get with a timeout
  • Non-blocking consuming, use Queue.get without timeout

@mumrah mumrah closed this Jun 13, 2013
@mumrah mumrah reopened this Jun 13, 2013
@mumrah
Copy link
Collaborator

mumrah commented Jun 13, 2013

Oops - did not mean to close

@mahendra
Copy link
Collaborator Author

@mumrah I thought you had merged this in.

Yeah, I had a look at the existing queues.py. It's good. My thoughts are:

  • If you want blocking producer - use what is there currently. Using Event() might be a bit un-necessary. The existing code itself is blocking
  • If you want non-blocking producer - use a Queue

The above is what I tried to do in this ticket. The whole thing can be made blocking/non-blocking by passing a async parameter. Or maybe, we can make it block. For the producer, I would prefer not to have a separate implementation.

  • If you want a non-blocking consumer - use the current API itself.
  • If you want a blocking consumer - use Queue

The reason for my comments is that for the blocking producer and non-blocking consumer, the current code works great. We don't need to add extra overheads of Event or Queue for these things. Adding these makes the code go through all sorts of locking and synchronization semantics which maybe a bit of an overkill.

The complex cases are - non-blocking producer and blocking consumer. This we can try to solve using Queues.

For blocking consumer, we can just use a Queue and a new API

def get_messages(self, blocking=False, timeout=None):
    if blocking:
        # Use some Queue logic with timeout
    else:
        # Iterate over (self)

Or we can add a parameter to SimpleConsumer to indicate blocking

Let me know what you think.

PS: My idea is to keep the most common cases as simple and sync/lock free as possible. For the uncommon cases, we can use them.

Add support for two options in the producer - req_acks and ack_timeout
The acks, if any, are passed to the caller directly
Also, ensure that the case of 'no-acks' works fine
In conn.send(), do not wait for the response. Wait for it only on
conn.recv(). This behaviour is fine now since the connection is not
shared among consumer threads etc.
@mahendra
Copy link
Collaborator Author

@mumrah - Let me know if you were able to go through this pull request. This implements the following features.

  • Support for an async producer. (Using multi-processing). In future, we can extend this to make use of gevent. We can make the parameter async as - None, THREADED or GEVENT
  • Support for ack parameters: no-ack, ack-after-local-write and ack-after-cluster-commit

I think, this patch will make the kafka python client as close to the Java/Scala clients bundled with Kafka. Let me know your thoughts.

@mumrah
Copy link
Collaborator

mumrah commented Jun 25, 2013

@mahendra few comments:

  • I'm not sure about the extra client side logic for dealing with acks. In client.py you are skipping the decoding of the produce response. The ack logic is implemented by the Kafka server and we should still decode the response here (we still need to check for error codes from the server)
  • A couple of tests maybe :)

Eventually, it would be nice to allow the async producer to batch messages automatically. This would be similar logic to the offset committer (send messages every N seconds or every M messages). These are important values to tune for performance (though I'm not sure how high performance this library could really be).

Looking good!

@mahendra
Copy link
Collaborator Author

@mumrah Thanks for the review.

  • In client.py, I am skipping the decoding of the produce response, only in cases where the acks param is set to 0. In such a case, the server does not send any response back. If we don't handle this, then _send_broker_aware_request() will block on conn.recv() till the socket times out. In case of acks being 1 or -1, the logic will check for the response.
  • Tests - sure! I am working on tests for the producer thingy, including partitioners.

Batching would be a very easy thing to do in our case. Will implement in later this week.
In _send_upstream(), the code can block till the queue size is that of the batch limit or till the timeout happens.

PS: I am sure, we can make the library high performing :-) - Needs a bit more tuning and maybe gevent to boot

@mumrah
Copy link
Collaborator

mumrah commented Jun 25, 2013

Yea, you're right according to
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest

That just seems crazy not to even get back a simple response (even
without the data being persisted). I hate special cases like this... oh
well.

On 6/25/13 9:20 AM, Mahendra M wrote:

@mumrah https://github.com/mumrah Thanks for the review.

In client.py, I am skipping the decoding of the produce response,
only in cases where the acks param is set to 0. In such a case,
the server does not send any response back. If we don't handle
this, then _send_broker_aware_request() will block on conn.recv()
till the socket times out. In case of acks being 1 or -1, the
logic will check for the response.
Tests - sure! I am working on tests for the producer thingy,
including partitioners.

Batching would be a very easy thing to do in our case. Will implement
in later this week.
In _send_upstream(), the code can block till the queue size is that of
the batch limit or till the timeout happens.

PS: I am sure, we can make the library high performing :-) - Needs a
bit more tuning and maybe gevent to boot


Reply to this email directly or view it on GitHub
#33 (comment).

Also improve on the logic for stopping the async Processor instance.
Ensure that unsend messages are sent before it is stopped.
@mahendra
Copy link
Collaborator Author

@mumrah with some re-org, I have added batch mode support also. Working on the test cases now.

Note: I have a common function _send_upstream() to handle both the batched mode and the normal async mode. It is a bit complicated for the normal async mode. If you want, I can implement a separate function for normal async operation as follows

def _send_upstream(self, queue):
    while True:
         partition, msg = queue.get()
         if partition == STOP_ASYNC_PRODUCER:
              break

         req = ProduceRequest(....)
         self.client.send_produce_request(....)

@mahendra
Copy link
Collaborator Author

@mumrah any docs of how to get a test setup going? I ran python setup.py test after setting environment variables (ZOOKEEPER_URI and KAFKA_URI) - quite a few errors.

Also, if I set KAFKA_ROOT to my kafka code base, it throws a lot of them (both on master branch)

@mumrah
Copy link
Collaborator

mumrah commented Jun 26, 2013

@mahendra did you try the instructions in the README? @sandello did some work on the tests recently to make it a little easier for devs to run (i.e., no external ZooKeeper required).

@mahendra
Copy link
Collaborator Author

@mumrah facepalm!! :-)

yep, got it working. have implemented test-cases and updated the pull request. Do have a look.

@mahendra
Copy link
Collaborator Author

@mumrah - let me know if you want a simpler _send_upstream for non-batch async messages.

@mahendra
Copy link
Collaborator Author

also, maybe we should update the README to mention all the new features of the library.

mumrah added a commit that referenced this pull request Jul 11, 2013
Support for async producer

Merged locally, tests pass, +1
@mumrah mumrah merged commit 5684af4 into dpkp:master Jul 11, 2013
@mahendra mahendra deleted the asyncproducer branch July 30, 2013 02:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants