Skip to content

Commit

Permalink
Implement Consumer.Commit, Fixes #2.
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Huus committed Jul 30, 2013
1 parent afd74f2 commit cc04b09
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 1 deletion.
72 changes: 71 additions & 1 deletion kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Consumer struct {
}

// NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
// part of the named consumer group.
// part of the named consumer group. It automatically fetches the offset at which to start reading from the Kafka cluster.
func NewConsumer(client *Client, topic string, partition int32, group string) (*Consumer, error) {
broker, err := client.leader(topic, partition)
if err != nil {
Expand Down Expand Up @@ -72,6 +72,76 @@ func (c *Consumer) Close() {
<-c.done
}

// Commit saves the given offset to the Kafka cluster so that the next time a consumer is started on this
// topic and partition, it will know the offset at which it can start.
func (c *Consumer) Commit(offset int64) error {
request := &k.OffsetCommitRequest{ConsumerGroup: c.group}
request.AddBlock(c.topic, c.partition, offset, "")

response, err := c.broker.CommitOffset(c.client.id, request)
switch err {
case nil:
break
case encoding.EncodingError:
return err
default:
c.client.disconnectBroker(c.broker)
c.broker, err = c.client.leader(c.topic, c.partition)
if err != nil {
return err
}
response, err = c.broker.CommitOffset(c.client.id, request)
if err != nil {
return err
}
}

kerr := response.GetError(c.topic, c.partition)
if kerr == nil {
c.client.disconnectBroker(c.broker)
c.broker, err = c.client.leader(c.topic, c.partition)
if err != nil {
return err
}
response, err = c.broker.CommitOffset(c.client.id, request)
if err != nil {
return err
}
kerr := response.GetError(c.topic, c.partition)
if kerr == nil {
return IncompleteResponse
}
}

switch *kerr {
case types.NO_ERROR:
return nil
case types.UNKNOWN_TOPIC_OR_PARTITION, types.NOT_LEADER_FOR_PARTITION, types.LEADER_NOT_AVAILABLE:
err = c.client.refreshTopic(c.topic)
if err != nil {
return err
}
c.broker, err = c.client.leader(c.topic, c.partition)
if err != nil {
return err
}
response, err := c.broker.CommitOffset(c.client.id, request)
if err != nil {
return err
}
kerr := response.GetError(c.topic, c.partition)
if kerr == nil {
return IncompleteResponse
} else if *kerr == types.NO_ERROR {
return nil
} else {
return *kerr
}
default:
return *kerr
}
}

// fetches the offset from the broker and stores it in c.offset
// TODO: need to figure out how kafka behaves when the topic exists but doesn't have a stored offset
// (offset of 0? -1? error?).
Expand Down
23 changes: 23 additions & 0 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ func TestSimpleConsumer(t *testing.T) {
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00}
extraResponses <- []byte{
0x00, 0x02, 'i', 'd',
0x00, 0x00, 0x00, 0x01,
0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00}
extraResponses <- []byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00}
}()

client, err := NewClient("clientID", "localhost", mockBroker.Port())
Expand All @@ -94,6 +109,10 @@ func TestSimpleConsumer(t *testing.T) {
t.Error(err)
}
}
err = consumer.Commit(9)
if err != nil {
t.Error(err)
}

consumer.Close()
client.Close()
Expand All @@ -119,6 +138,10 @@ consumerLoop:
select {
case msg := <-consumer.Messages():
fmt.Println(msg)
err := consumer.Commit(msg.Offset)
if err != nil {
panic(err)
}
case err := <-consumer.Errors():
panic(err)
case <-time.After(5 * time.Second):
Expand Down

0 comments on commit cc04b09

Please sign in to comment.