Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New APIs for getting messages and message count #22

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,30 @@ def seek(self, offset, whence):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)

def pending(self, partitions=[]):
"""
Gets the pending message count

partitions: list of partitions to check for, default is to check all
"""
if len(partitions) == 0:
partitions = self.offsets.keys()

total = 0
reqs = []

for partition in partitions:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))

resps = self.client.send_offset_request(reqs)
for resp in resps:
partition = resp.partition
pending = resp.offsets[0]
offset = self.offsets[partition]
total += pending - offset - (1 if offset > 0 else 0)

return total

def commit(self, partitions=[]):
"""
Commit offsets for this consumer
Expand Down Expand Up @@ -131,6 +155,26 @@ def commit(self, partitions=[]):
assert resp.error == 0
self.count_since_commit = 0

def get_messages(self, count=1):
"""
Get the specified number of messages

count: maximum number of messages to be fetched
"""
if not hasattr(self, '_iterator'):
self._iterator = iter(self)

msgs = []
while count > 0:
try:
msgs.append(self._iterator.next())
count -= 1
except StopIteration:
delattr(self, '_iterator')
break

return msgs

def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next() until they are
Expand Down