Skip to content

Commit

Permalink
Merge pull request #1127 from bsm/feature/simpler-offset-manager
Browse files Browse the repository at this point in the history
Simpler offset management, fixed minor race
  • Loading branch information
eapache authored Jul 20, 2018
2 parents 2211465 + 433beec commit c5b44e3
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 357 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ _testmain.go
*.exe

coverage.txt
profile.out
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ type Config struct {
// broker version 0.9.0 or later.
// (default is 0: disabled).
Retention time.Duration

Retry struct {
// The total number of times to retry failing commit
// requests during OffsetManager shutdown (default 3).
Max int
}
}
}

Expand Down Expand Up @@ -314,6 +320,7 @@ func NewConfig() *Config {
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
Expand Down Expand Up @@ -450,7 +457,8 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")

case c.Consumer.Offsets.Retry.Max < 0:
return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0")
}

// validate misc shared values
Expand Down
Loading

0 comments on commit c5b44e3

Please sign in to comment.