Skip to content

Commit

Permalink
Merge pull request #932 from fgeller/reset-offsets
Browse files Browse the repository at this point in the history
adds ResetOffset to reset to earlier offset values.
  • Loading branch information
eapache authored Sep 12, 2017
2 parents 298679c + b966238 commit be3a4e3
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 0 deletions.
18 changes: 18 additions & 0 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ type PartitionOffsetManager interface {
// message twice, and your processing should ideally be idempotent.
MarkOffset(offset int64, metadata string)

// ResetOffset resets to the provided offset, alongside a metadata string that
// represents the state of the partition consumer at that point in time. Reset
// acts as a counterpart to MarkOffset, the difference being that it allows to
// reset an offset to an earlier or smaller value, where MarkOffset only
// allows incrementing the offset. cf MarkOffset for more details.
ResetOffset(offset int64, metadata string)

// Errors returns a read channel of errors that occur during offset management, if
// enabled. By default, errors are logged and not returned over this channel. If
// you want to implement any custom error handling, set your config's
Expand Down Expand Up @@ -329,6 +336,17 @@ func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
}
}

func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
pom.lock.Lock()
defer pom.lock.Unlock()

if offset < pom.offset {
pom.offset = offset
pom.metadata = metadata
pom.dirty = true
}
}

func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
pom.lock.Lock()
defer pom.lock.Unlock()
Expand Down
64 changes: 64 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,70 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {
safeClose(t, testClient)
}

func TestPartitionOffsetManagerResetOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
coordinator.Returns(ocResponse)

expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
actual, meta := pom.NextOffset()

if actual != expected {
t.Errorf("Expected offset %v. Actual: %v", expected, actual)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}

safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
testClient.Config().Consumer.Offsets.Retention = time.Hour

pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
handler := func(req *request) (res encoder) {
if req.body.version() != 2 {
t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
}
offsetCommitRequest := req.body.(*OffsetCommitRequest)
if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
}
return ocResponse
}
coordinator.setHandler(handler)

expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
actual, meta := pom.NextOffset()

if actual != expected {
t.Errorf("Expected offset %v. Actual: %v", expected, actual)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}

safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
Expand Down

0 comments on commit be3a4e3

Please sign in to comment.