Skip to content

Commit

Permalink
Merge pull request #33 from mahendra/asyncproducer
Browse files Browse the repository at this point in the history
Support for async producer

Merged locally, tests pass, +1
  • Loading branch information
mumrah committed Jul 11, 2013
2 parents ffdc08a + d2df8f5 commit 5684af4
Show file tree
Hide file tree
Showing 6 changed files with 509 additions and 30 deletions.
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,46 @@ development, APIs are subject to change.
```python
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
from kafka.producer import SimpleProducer, KeyedProducer

kafka = KafkaClient("localhost", 9092)

# To send messages synchronously
producer = SimpleProducer(kafka, "my-topic")
producer.send_messages("some message")
producer.send_messages("this method", "is variadic")

# To send messages asynchronously
producer = SimpleProducer(kafka, "my-topic", async=True)
producer.send_messages("async message")

# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
producer = SimpleProducer(kafka, "my-topic", async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
acks_timeout=2000)

response = producer.send_messages("async message")

if response:
print(response[0].error)
print(response[0].offset)

# To send messages in batch. You can use any of the available
# producers for doing this. The following producer will collect
# messages in batch and send them to Kafka after 20 messages are
# collected or every 60 seconds
# Notes:
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
producer = SimpleProducer(kafka, "my-topic", batch_send=True,
batch_send_every_n=20,
batch_send_every_t=60)

# To consume messages
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
for message in consumer:
print(message)
Expand Down
6 changes: 4 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message
)
from kafka.producer import SimpleProducer
from kafka.producer import SimpleProducer, KeyedProducer
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
from kafka.consumer import SimpleConsumer

__all__ = [
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'SimpleConsumer',
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
'create_message', 'create_gzip_message', 'create_snappy_message'
]
13 changes: 11 additions & 2 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,16 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

# Send the request, recv the response
conn.send(requestId, request)

if decoder_fn is None:
continue

response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response

# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
return (acc[k] for k in original_keys) if acc else ()

#################
# Public API #
Expand Down Expand Up @@ -201,7 +205,12 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,

encoder = partial(KafkaProtocol.encode_produce_request,
acks=acks, timeout=timeout)
decoder = KafkaProtocol.decode_produce_response

if acks == 0:
decoder = None
else:
decoder = KafkaProtocol.decode_produce_response

resps = self._send_broker_aware_request(payloads, encoder, decoder)

out = []
Expand Down
2 changes: 1 addition & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ def send(self, requestId, payload):
sent = self._sock.sendall(payload)
if sent != None:
raise RuntimeError("Kafka went away")
self.data = self._consume_response()

def recv(self, requestId):
"Get a response from Kafka"
log.debug("Reading response %d from Kafka" % requestId)
self.data = self._consume_response()
return self.data

def close(self):
Expand Down
207 changes: 186 additions & 21 deletions kafka/producer.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,186 @@
from collections import defaultdict
from datetime import datetime, timedelta
from itertools import cycle
from multiprocessing import Queue, Process
from Queue import Empty
import logging
import sys

from kafka.common import ProduceRequest
from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner

log = logging.getLogger("kafka")

BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20

class SimpleProducer(object):
STOP_ASYNC_PRODUCER = -1


class Producer(object):
"""
A simple, round-robbin producer. Each message goes to exactly one partition
Base class to be used by producers
Params:
client - The Kafka client instance to use
topic - The topic for sending messages to
async - If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
receive before responding to the request
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
batch_send - If True, messages are send in batches
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
def __init__(self, client, topic):

ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed

DEFAULT_ACK_TIMEOUT = 1000

def __init__(self, client, async=False,
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):

if batch_send:
async = True
assert batch_send_every_n > 0
assert batch_send_every_t > 0
else:
batch_send_every_n = 1
batch_send_every_t = 3600

self.client = client
self.async = async
self.req_acks = req_acks
self.ack_timeout = ack_timeout
self.batch_send = batch_send
self.batch_size = batch_send_every_n
self.batch_time = batch_send_every_t

if self.async:
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=self._send_upstream, args=(self.queue,))
self.proc.daemon = True # Process will die if main thread exits
self.proc.start()

def _send_upstream(self, queue):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
stop = False

while not stop:
timeout = self.batch_time
send_at = datetime.now() + timedelta(seconds=timeout)
count = self.batch_size
msgset = defaultdict(list)

# Keep fetching till we gather enough messages or a
# timeout is reached
while count > 0 and timeout >= 0:
try:
partition, msg = queue.get(timeout=timeout)
except Empty:
break

# Check if the controller has requested us to stop
if partition == STOP_ASYNC_PRODUCER:
stop = True
break

# Adjust the timeout to match the remaining period
count -= 1
timeout = (send_at - datetime.now()).total_seconds()
msgset[partition].append(msg)

# Send collected requests upstream
reqs = []
for partition, messages in msgset.items():
req = ProduceRequest(self.topic, partition, messages)
reqs.append(req)

try:
self.client.send_produce_request(reqs, acks=self.req_acks,
timeout=self.ack_timeout)
except Exception as exp:
log.error("Error sending message", exc_info=sys.exc_info())

def send_messages(self, partition, *msg):
"""
Helper method to send produce requests
"""
if self.async:
for m in msg:
self.queue.put((partition, create_message(m)))
resp = []
else:
messages = [create_message(m) for m in msg]
req = ProduceRequest(self.topic, partition, messages)
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
return resp

def stop(self, timeout=1):
"""
Stop the producer. Optionally wait for the specified timeout before
forcefully cleaning up.
"""
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None))
self.proc.join(timeout)

if self.proc.is_alive():
self.proc.terminate()


class SimpleProducer(Producer):
"""
A simple, round-robbin producer. Each message goes to exactly one partition
Params:
client - The Kafka client instance to use
topic - The topic for sending messages to
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
receive before responding to the request
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
batch_send - If True, messages are send in batches
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
def __init__(self, client, topic, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
self.client._load_metadata_for_topics(topic)
self.next_partition = cycle(self.client.topic_partitions[topic])
client._load_metadata_for_topics(topic)
self.next_partition = cycle(client.topic_partitions[topic])

def send_messages(self, *msg):
req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[create_message(m) for m in msg])
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)

resp = self.client.send_produce_request([req])[0]
assert resp.error == 0
def send_messages(self, *msg):
partition = self.next_partition.next()
return super(SimpleProducer, self).send_messages(partition, *msg)


class KeyedProducer(object):
class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
Expand All @@ -35,23 +189,34 @@ class KeyedProducer(object):
topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
batch_send - If True, messages are send in batches
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
def __init__(self, client, topic, partitioner=None):
self.client = client
def __init__(self, client, topic, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
self.client._load_metadata_for_topics(topic)
client._load_metadata_for_topics(topic)

if not partitioner:
partitioner = HashedPartitioner

self.partitioner = partitioner(self.client.topic_partitions[topic])
self.partitioner = partitioner(client.topic_partitions[topic])

super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)

def send(self, key, msg):
partitions = self.client.topic_partitions[self.topic]
partition = self.partitioner.partition(key, partitions)

req = ProduceRequest(self.topic, partition,
messages=[create_message(msg)])

resp = self.client.send_produce_request([req])[0]
assert resp.error == 0
return self.send_messages(partition, msg)
Loading

0 comments on commit 5684af4

Please sign in to comment.