Skip to content

Commit

Permalink
Add a StartingOffset config to the Consumer
Browse files Browse the repository at this point in the history
Fixes #4

When #2 lands, we can define a constant for -1 that triggers sending an
OffsetFetchRequest without touching the API.
  • Loading branch information
Evan Huus committed Aug 15, 2013
1 parent dae9785 commit 541ea31
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type ConsumerConfig struct {
// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
// returns fewer than that anyways. The default of 0 is treated as no limit.
MaxWaitTime int32
// The offset to start fetching messages from
StartingOffset int64
}

// Consumer processes Kafka messages from a given topic and partition.
Expand Down Expand Up @@ -60,6 +62,10 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
return nil, ConfigurationError("Invalid MaxWaitTime")
}

if config.StartingOffset < 0 {
return nil, ConfigurationError("Invalid StartingOffset")
}

broker, err := client.leader(topic, partition)
if err != nil {
return nil, err
Expand All @@ -71,10 +77,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
c.partition = partition
c.group = group
c.config = *config

// We should really be sending an OffsetFetchRequest, but that doesn't seem to
// work in kafka yet. Hopefully will in beta 2...
c.offset = 0
c.offset = config.StartingOffset
c.broker = broker
c.stopper = make(chan bool)
c.done = make(chan bool)
Expand Down

0 comments on commit 541ea31

Please sign in to comment.