Skip to content

Commit

Permalink
Closes #22
Browse files Browse the repository at this point in the history
Removed get_messages API, added test for get_pending
  • Loading branch information
mumrah committed May 28, 2013
1 parent 7ab7690 commit f4a326f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
20 changes: 0 additions & 20 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,26 +155,6 @@ def commit(self, partitions=[]):
assert resp.error == 0
self.count_since_commit = 0

def get_messages(self, count=1):
"""
Get the specified number of messages
count: maximum number of messages to be fetched
"""
if not hasattr(self, '_iterator'):
self._iterator = iter(self)

msgs = []
while count > 0:
try:
msgs.append(self._iterator.next())
count -= 1
except StopIteration:
delattr(self, '_iterator')
break

return msgs

def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next() until they are
Expand Down
21 changes: 21 additions & 0 deletions test/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,27 @@ def test_consumer(self):

self.assertEquals(len(all_messages), 13)

def test_pending(self):
# Produce 10 messages to partition 0 and 1

produce1 = ProduceRequest("test_pending", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce1]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)

produce2 = ProduceRequest("test_pending", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce2]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)

consumer = SimpleConsumer(self.client, "group1", "test_pending")
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)

if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
Expand Down

0 comments on commit f4a326f

Please sign in to comment.