Skip to content

Commit

Permalink
Merge pull request #461 from Shopify/offset-manager
Browse files Browse the repository at this point in the history
OffsetManager Implementation
  • Loading branch information
eapache committed Aug 24, 2015
2 parents e849ff4 + 4c0d6fc commit 23a7cd9
Show file tree
Hide file tree
Showing 5 changed files with 907 additions and 1 deletion.
18 changes: 18 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ type Config struct {
// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
Errors bool
}

// Offsets specifies configuration for how and when to commit consumed offsets. This currently requires the
// manual use of an OffsetManager but will eventually be automated.
Offsets struct {
// How frequently to commit updated offsets. Defaults to 10s.
CommitInterval time.Duration

// The initial offset to use if no offset was previously committed. Should be OffsetNewest or OffsetOldest.
// Defaults to OffsetNewest.
Initial int64
}
}

// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
Expand Down Expand Up @@ -164,6 +175,8 @@ func NewConfig() *Config {
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 10 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest

c.ChannelBufferSize = 256

Expand Down Expand Up @@ -263,6 +276,11 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.CommitInterval <= 0:
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")

}

// validate misc shared values
Expand Down
51 changes: 51 additions & 0 deletions functional_offset_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package sarama

import (
"testing"
)

func TestFuncOffsetManager(t *testing.T) {
checkKafkaVersion(t, "0.8.2")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

client, err := NewClient(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
if err != nil {
t.Fatal(err)
}

if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
}

pom1, err := offsetManager.ManagePartition("test.1", 0)
if err != nil {
t.Fatal(err)
}

pom1.MarkOffset(10, "test metadata")
safeClose(t, pom1)

pom2, err := offsetManager.ManagePartition("test.1", 0)
if err != nil {
t.Fatal(err)
}

offset, metadata := pom2.NextOffset()

if offset != 10+1 {
t.Errorf("Expected the next offset to be 11, found %d.", offset)
}
if metadata != "test metadata" {
t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
}

safeClose(t, pom2)
safeClose(t, offsetManager)
safeClose(t, client)
}
2 changes: 1 addition & 1 deletion functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func checkKafkaAvailability(t testing.TB) {
func checkKafkaVersion(t testing.TB, requiredVersion string) {
kafkaVersion := os.Getenv("KAFKA_VERSION")
if kafkaVersion == "" {
t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)
t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
} else {
available := parseKafkaVersion(kafkaVersion)
required := parseKafkaVersion(requiredVersion)
Expand Down
Loading

0 comments on commit 23a7cd9

Please sign in to comment.