Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer offsets discussion #26

Closed
mumrah opened this issue May 29, 2013 · 8 comments
Closed

Consumer offsets discussion #26

mumrah opened this issue May 29, 2013 · 8 comments

Comments

@mumrah
Copy link
Collaborator

mumrah commented May 29, 2013

I think we need to look at the way offsets are advanced and stored by the client, then go fix things rather than little one-off changes. I'm also doing this for my own sake to refresh myself with this code (it's been a while).

Suppose we have the following data in Kafka

Partition 0
========
Message 4
Message 5
Message 6
Message 7

Partition 1
========
Message 3
Message 4
Message 5
Message 6

And our starting (partition, offset) values are: (0, 4), (1, 3).

When the consumer __iter__ is called, it sets up a list of sub-iterators for each partition with their current offsets. So at this point, we have __iter_partition__(0, 4) and __iter_partition__(1, 3).

Here is an annotated version of the loop:

while True:
    # A
    req = FetchRequest(self.topic, partition, offset, 1024)
    (resp,) = self.client.send_fetch_request([req])
    assert resp.topic == self.topic
    assert resp.partition == partition
    next_offset = None
    for message in resp.messages:
        next_offset = message.offset
        # B
        yield message
        self.offsets[partition] = message.offset
        # C
    # D
    if next_offset is None:
        break
    else:
        offset = next_offset + 1

Values of offset, next_offset, and self.offsets[partition]

    offset  next_offset     self.offsets[partition]
A   4       -               4
B1  4       4               4
C1  4       4               4
B2  4       5               4
C2  4       5               5
D   4       5               5
A   6       -               5
B1  6       6               5
C1  6       6               6

From a consumer point of view, the state of self.offsets reflects the offset of the previous message. Since the generator will pause execution immediately after the yield, the state of self.offsets will not be updated until the next iteration of the generator. The initial thinking behind this was that you only advance the offsets after the message has been "processed" (whatever that means). However, this means if you commit the offsets in self.offset, they will lag behind.

mumrah added a commit that referenced this issue Jun 1, 2013
* Update the "public" offset before yielding the message
* Add an option to SimpleConsumer.commit that excludes the current
  offset

Ref #26
@mumrah
Copy link
Collaborator Author

mumrah commented Jun 1, 2013

@mahendra check out https://github.com/mumrah/kafka-python/tree/issue-26 if you can. If we update the offset before yielding the message, then the only thing we need to be careful about is committing and offset for a message that might not be "done". I've added an option to SimpleConsumer.commit that allows the caller to omit the current offset - the async committer (i.e., the timer based one) will set this flag to true as to avoid committing the current offset.

LMK what you think, and thanks for all the contributions so far!

@mahendra
Copy link
Collaborator

mahendra commented Jun 3, 2013

@mumrah I just had a look and was a bit confused. So I tried to see if it fixes the problem that I noticed.

In the example code that you mentioned, just try this

consumer = SimpleConsumer(kafka, "my-group", "my-topic")
# This should print all pending messages
for message in consumer:
    print(message)

# No more messages should be printed.
for message in consumer:
    print(message)   # Last message is printed again
for message in consumer:
    print(message)   # Last message is printed again

The second for loop will repeat the last message.

@mumrah
Copy link
Collaborator Author

mumrah commented Jun 3, 2013

@mahendra Is this relying on 0.8.1 offset commits, or just the internal state of SimpleConsumer. I do see how this would a problem using the internal state (notice how the commit function adds one to the current offset). I wonder what is the use case of repeatedly iterating across the consumer like this?

@mahendra
Copy link
Collaborator

mahendra commented Jun 4, 2013

@mumrah it is relying on the internal state of SimpleConsumer.

This is for the case where I need to use just a single consumer instance and keep fetching messages periodically.

Creating a new consumer instance every time may be an overkill, specially with having to spawn threads for timer based commits.

So, I create a consumer. Iterate over it and get a set of messages. Wait for some time. Then iterate over it again.

I had implemented an API for get_messages which implemented this, but I just noticed that you had raised reservations about it

An example for this is as a kafka broker for celery. Do have a look at this example.
https://github.com/mahendra/kombu/blob/kafka/kombu/transport/kafka.py

I would even prefer to have a blocking API for get_messages(block=True, timeout=None), if we can pull it off.

@mumrah
Copy link
Collaborator Author

mumrah commented Jun 4, 2013

@mahendra one feature I've yet to work on is a Queue-like consumer which sounds more like what you want. I had one implemented for 0.7, but I have not yet ported it for 0.8, though I imagine it would work mostly the same. Would this work for what you're doing?

@mahendra
Copy link
Collaborator

@mumrah I had a look at that code. The code in queue.py refers to standalone kafka queue implementation. What I am trying to do is to make this fit into other queue-ing/messaging frameworks (like kombu - which will make kafka usable under frameworks like celery).

I also referred to the following doc on designing a kafka consumer. https://cwiki.apache.org/KAFKA/consumer-group-example.html

Quoting one point that is relevant to our discussion

your logic should expect to get an iterator from Kafka that may block if there are no new messages available

Also, if you look at the consumer code there:

ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
    System.out.println(new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);

So, going by that logic, we need our iterator (over a consumer stream) to be

  • re-usable (we can pass it along to others who keep a reference and iterate on it at will)
  • blocking (we have to see how to implement this)

In the current implementation, there is a bug if the iterator is use more than once. If someone has a copy of the iterator and uses it once, from the second iteration onward, the last message of the previous iteration repeats.

@mahendra
Copy link
Collaborator

@mumrah - as part the multiprocess consumer work, I tried a blocking API for SimpleConsumer without making use of Queues or Events. This has been done by using FetchRequest parameters.

https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-FetchRequest

The code is available on https://github.com/mahendra/kafka-python/compare/partition
This implementation now makes SimpleConsumer exactly as the Java/Scala API

  • A non blocking iterator - which will keep iterating till there are no more messages. On iterating again, it will return more messages if they have appeared. The need for re-usability is based on
    • infinite nature of the kafka stream. messages can keep coming in continuously or in bursts
    • if not re-usable, every time we have to iterate, we need to initiate a new consumer again
  • A blocking (optional) get_messages() API

@rdiomar
Copy link
Collaborator

rdiomar commented Jan 29, 2014

The iterator and offset increments have drastically changed since 7 months ago. Should be as expected now. Reopen if you think this is not resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants