diff --git a/kafka/consumer.go b/kafka/consumer.go index aa5993d44..c3d70efb3 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -24,7 +24,7 @@ type Consumer struct { } // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as -// part of the named consumer group. +// part of the named consumer group. It automatically fetches the offset at which to start reading from the Kafka cluster. func NewConsumer(client *Client, topic string, partition int32, group string) (*Consumer, error) { broker, err := client.leader(topic, partition) if err != nil { @@ -36,16 +36,19 @@ func NewConsumer(client *Client, topic string, partition int32, group string) (* c.topic = topic c.partition = partition c.group = group - - // We should really be sending an OffsetFetchRequest, but that doesn't seem to - // work in kafka yet. Hopefully will in beta 2... - c.offset = 0 c.broker = broker + + // fetch the saved offset for this partition + err = c.fetchOffset() + if err != nil { + return nil, err + } + + // start the fetching loop c.stopper = make(chan bool) c.done = make(chan bool) c.messages = make(chan *Message) c.errors = make(chan error) - go c.fetchMessages() return c, nil @@ -69,6 +72,150 @@ func (c *Consumer) Close() { <-c.done } +// Commit saves the given offset to the Kafka cluster so that the next time a consumer is started on this +// topic and partition, it will know the offset at which it can start. +func (c *Consumer) Commit(offset int64) error { + request := &k.OffsetCommitRequest{ConsumerGroup: c.group} + request.AddBlock(c.topic, c.partition, offset, "") + + response, err := c.broker.CommitOffset(c.client.id, request) + switch err { + case nil: + break + case encoding.EncodingError: + return err + default: + c.client.disconnectBroker(c.broker) + c.broker, err = c.client.leader(c.topic, c.partition) + if err != nil { + return err + } + response, err = c.broker.CommitOffset(c.client.id, request) + if err != nil { + return err + } + } + + kerr := response.GetError(c.topic, c.partition) + if kerr == nil { + c.client.disconnectBroker(c.broker) + c.broker, err = c.client.leader(c.topic, c.partition) + if err != nil { + return err + } + response, err = c.broker.CommitOffset(c.client.id, request) + if err != nil { + return err + } + kerr := response.GetError(c.topic, c.partition) + if kerr == nil { + return IncompleteResponse + } + } + + switch *kerr { + case types.NO_ERROR: + return nil + case types.UNKNOWN_TOPIC_OR_PARTITION, types.NOT_LEADER_FOR_PARTITION, types.LEADER_NOT_AVAILABLE: + err = c.client.refreshTopic(c.topic) + if err != nil { + return err + } + c.broker, err = c.client.leader(c.topic, c.partition) + if err != nil { + return err + } + response, err := c.broker.CommitOffset(c.client.id, request) + if err != nil { + return err + } + kerr := response.GetError(c.topic, c.partition) + if kerr == nil { + return IncompleteResponse + } else if *kerr == types.NO_ERROR { + return nil + } else { + return *kerr + } + default: + return *kerr + } +} + +// fetches the offset from the broker and stores it in c.offset +// TODO: need to figure out how kafka behaves when the topic exists but doesn't have a stored offset +// (offset of 0? -1? error?). +func (c *Consumer) fetchOffset() error { + request := &k.OffsetFetchRequest{ConsumerGroup: c.group} + request.AddPartition(c.topic, c.partition) + + response, err := c.broker.FetchOffset(c.client.id, request) + switch { + case err == nil: + break + case err == encoding.EncodingError: + return err + default: + c.client.disconnectBroker(c.broker) + c.broker, err = c.client.leader(c.topic, c.partition) + if err != nil { + return err + } + response, err = c.broker.FetchOffset(c.client.id, request) + if err != nil { + return err + } + } + + block := response.GetBlock(c.topic, c.partition) + if block == nil { + c.client.disconnectBroker(c.broker) + c.broker, err = c.client.leader(c.topic, c.partition) + if err != nil { + return err + } + response, err = c.broker.FetchOffset(c.client.id, request) + if err != nil { + return err + } + block := response.GetBlock(c.topic, c.partition) + if block == nil { + return IncompleteResponse + } + } + + switch block.Err { + case types.NO_ERROR: + c.offset = block.Offset + return nil + case types.UNKNOWN_TOPIC_OR_PARTITION, types.NOT_LEADER_FOR_PARTITION, types.LEADER_NOT_AVAILABLE: + err = c.client.refreshTopic(c.topic) + if err != nil { + return err + } + c.broker, err = c.client.leader(c.topic, c.partition) + if err != nil { + return err + } + response, err := c.broker.FetchOffset(c.client.id, request) + if err != nil { + return err + } + block := response.GetBlock(c.topic, c.partition) + if block == nil { + return IncompleteResponse + } + if block.Err == types.NO_ERROR { + c.offset = block.Offset + return nil + } else { + return block.Err + } + default: + return block.Err + } +} + // helper function for safely sending an error on the errors channel // if it returns true, the error was sent (or was nil) // if it returns false, the stopper channel signaled that your goroutine should return! @@ -89,7 +236,6 @@ func (c *Consumer) sendError(err error) bool { } func (c *Consumer) fetchMessages() { - var fetchSize int32 = 1024 for { diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 5375172d8..178ff3fa7 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -34,6 +34,15 @@ func TestSimpleConsumer(t *testing.T) { binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port())) masterResponses <- response go func() { + extraResponses <- []byte{ + 0x00, 0x02, 'i', 'd', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + 0x00, 0x00} for i := 0; i < 10; i++ { msg := []byte{ 0x00, 0x00, 0x00, 0x01, @@ -63,6 +72,21 @@ func TestSimpleConsumer(t *testing.T) { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + extraResponses <- []byte{ + 0x00, 0x02, 'i', 'd', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00} + extraResponses <- []byte{ + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00} }() client, err := NewClient("clientID", "localhost", mockBroker.Port()) @@ -85,6 +109,10 @@ func TestSimpleConsumer(t *testing.T) { t.Error(err) } } + err = consumer.Commit(9) + if err != nil { + t.Error(err) + } consumer.Close() client.Close() @@ -110,6 +138,10 @@ consumerLoop: select { case msg := <-consumer.Messages(): fmt.Println(msg) + err := consumer.Commit(msg.Offset) + if err != nil { + panic(err) + } case err := <-consumer.Errors(): panic(err) case <-time.After(5 * time.Second): diff --git a/protocol/offset_commit_response.go b/protocol/offset_commit_response.go index f4c895910..f9a59d311 100644 --- a/protocol/offset_commit_response.go +++ b/protocol/offset_commit_response.go @@ -5,7 +5,7 @@ import "sarama/types" type OffsetCommitResponse struct { ClientID string - Errors map[string]map[int32]types.KError + Errors map[string]map[int32]*types.KError } func (r *OffsetCommitResponse) Decode(pd enc.PacketDecoder) (err error) { @@ -19,7 +19,7 @@ func (r *OffsetCommitResponse) Decode(pd enc.PacketDecoder) (err error) { return err } - r.Errors = make(map[string]map[int32]types.KError, numTopics) + r.Errors = make(map[string]map[int32]*types.KError, numTopics) for i := 0; i < numTopics; i++ { name, err := pd.GetString() if err != nil { @@ -31,7 +31,7 @@ func (r *OffsetCommitResponse) Decode(pd enc.PacketDecoder) (err error) { return err } - r.Errors[name] = make(map[int32]types.KError, numErrors) + r.Errors[name] = make(map[int32]*types.KError, numErrors) for j := 0; j < numErrors; j++ { id, err := pd.GetInt32() @@ -43,9 +43,22 @@ func (r *OffsetCommitResponse) Decode(pd enc.PacketDecoder) (err error) { if err != nil { return err } - r.Errors[name][id] = types.KError(tmp) + tmp2 := types.KError(tmp) + r.Errors[name][id] = &tmp2 } } return nil } + +func (r *OffsetCommitResponse) GetError(topic string, partition int32) *types.KError { + if r.Errors == nil { + return nil + } + + if r.Errors[topic] == nil { + return nil + } + + return r.Errors[topic][partition] +} diff --git a/protocol/offset_commit_response_test.go b/protocol/offset_commit_response_test.go index dbe5bbcb0..a6ec365e9 100644 --- a/protocol/offset_commit_response_test.go +++ b/protocol/offset_commit_response_test.go @@ -45,7 +45,8 @@ func TestNormalOffsetCommitResponse(t *testing.T) { t.Error("Decoding produced errors for topic 'm' where there were none.") } if len(response.Errors["t"]) == 1 { - if response.Errors["t"][0] != types.NOT_LEADER_FOR_PARTITION { + err := response.GetError("t", 0) + if (*err) != types.NOT_LEADER_FOR_PARTITION { t.Error("Decoding produced wrong error for topic 't' partition 0.") } } else { diff --git a/protocol/offset_fetch_response.go b/protocol/offset_fetch_response.go index afc2ac04c..7557e659b 100644 --- a/protocol/offset_fetch_response.go +++ b/protocol/offset_fetch_response.go @@ -76,3 +76,15 @@ func (r *OffsetFetchResponse) Decode(pd enc.PacketDecoder) (err error) { return nil } + +func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock { + if r.Blocks == nil { + return nil + } + + if r.Blocks[topic] == nil { + return nil + } + + return r.Blocks[topic][partition] +}