Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for keyed messages #32

Merged
merged 4 commits into from
Jun 13, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ for message in consumer:
kafka.close()
```

## Keyed messages
```python
from kafka.client import KafkaClient
from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner

kafka = KafkaClient("localhost", 9092)

# HashedPartitioner is default
producer = KeyedProducer(kafka, "my-topic")
producer.send("key1", "some message")
producer.send("key2", "this methode")

producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
```

## Low level

```python
Expand Down
5 changes: 5 additions & 0 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ def _load_metadata_for_topics(self, *topics):

self.brokers.update(brokers)
self.topics_to_brokers = {}

for topic, partitions in topics.items():
# Clear the list once before we add it. This removes stale entries
# and avoids duplicates
self.topic_partitions.pop(topic, None)

if not partitions:
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
Expand Down
56 changes: 56 additions & 0 deletions kafka/partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from itertools import cycle


class Partitioner(object):
"""
Base class for a partitioner
"""
def __init__(self, partitions):
"""
Initialize the partitioner

partitions - A list of available partitions (during startup)
"""
self.partitions = partitions

def partition(self, key, partitions):
"""
Takes a string key and num_partitions as argument and returns
a partition to be used for the message

partitions - The list of partitions is passed in every call. This
may look like an overhead, but it will be useful
(in future) when we handle cases like rebalancing
"""
raise NotImplemented('partition function has to be implemented')


class RoundRobinPartitioner(Partitioner):
"""
Implements a round robin partitioner which sends data to partitions
in a round robin fashion
"""
def __init__(self, partitions):
self._set_partitions(partitions)

def _set_partitions(self, partitions):
self.partitions = partitions
self.iterpart = cycle(partitions)

def partition(self, key, partitions):
# Refresh the partition list if necessary
if self.partitions != partitions:
self._set_partitions(partitions)

return self.iterpart.next()


class HashedPartitioner(Partitioner):
"""
Implements a partitioner which selects the target partition based on
the hash of the key
"""
def partition(self, key, partitions):
size = len(partitions)
idx = hash(key) % size
return partitions[idx]
32 changes: 32 additions & 0 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from kafka.common import ProduceRequest
from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner

log = logging.getLogger("kafka")

Expand All @@ -23,3 +24,34 @@ def send_messages(self, *msg):

resp = self.client.send_produce_request([req])[0]
assert resp.error == 0


class KeyedProducer(object):
"""
A producer which distributes messages to partitions based on the key

Args:
client - The kafka client instance
topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
"""
def __init__(self, client, topic, partitioner=None):
self.client = client
self.topic = topic
self.client._load_metadata_for_topics(topic)

if not partitioner:
partitioner = HashedPartitioner

self.partitioner = partitioner(self.client.topic_partitions[topic])

def send(self, key, msg):
partitions = self.client.topic_partitions[self.topic]
partition = self.partitioner.partition(key, partitions)

req = ProduceRequest(self.topic, partition,
messages=[create_message(msg)])

resp = self.client.send_produce_request([req])[0]
assert resp.error == 0