diff --git a/offset_manager.go b/offset_manager.go index 5e15cdafe..5b8539b58 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -151,6 +151,13 @@ type PartitionOffsetManager interface { // message twice, and your processing should ideally be idempotent. MarkOffset(offset int64, metadata string) + // ResetOffset resets to the provided offset, alongside a metadata string that + // represents the state of the partition consumer at that point in time. Reset + // acts as a counterpart to MarkOffset, the difference being that it allows to + // reset an offset to an earlier or smaller value, where MarkOffset only + // allows incrementing the offset. cf MarkOffset for more details. + ResetOffset(offset int64, metadata string) + // Errors returns a read channel of errors that occur during offset management, if // enabled. By default, errors are logged and not returned over this channel. If // you want to implement any custom error handling, set your config's @@ -329,6 +336,17 @@ func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) { } } +func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) { + pom.lock.Lock() + defer pom.lock.Unlock() + + if offset < pom.offset { + pom.offset = offset + pom.metadata = metadata + pom.dirty = true + } +} + func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) { pom.lock.Lock() defer pom.lock.Unlock() diff --git a/offset_manager_test.go b/offset_manager_test.go index c111a5a63..21e4947c6 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -204,6 +204,70 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) { safeClose(t, testClient) } +func TestPartitionOffsetManagerResetOffset(t *testing.T) { + om, testClient, broker, coordinator := initOffsetManager(t) + pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") + + ocResponse := new(OffsetCommitResponse) + ocResponse.AddError("my_topic", 0, ErrNoError) + coordinator.Returns(ocResponse) + + expected := int64(1) + pom.ResetOffset(expected, "modified_meta") + actual, meta := pom.NextOffset() + + if actual != expected { + t.Errorf("Expected offset %v. Actual: %v", expected, actual) + } + if meta != "modified_meta" { + t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta) + } + + safeClose(t, pom) + safeClose(t, om) + safeClose(t, testClient) + broker.Close() + coordinator.Close() +} + +func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { + om, testClient, broker, coordinator := initOffsetManager(t) + testClient.Config().Consumer.Offsets.Retention = time.Hour + + pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") + + ocResponse := new(OffsetCommitResponse) + ocResponse.AddError("my_topic", 0, ErrNoError) + handler := func(req *request) (res encoder) { + if req.body.version() != 2 { + t.Errorf("Expected to be using version 2. Actual: %v", req.body.version()) + } + offsetCommitRequest := req.body.(*OffsetCommitRequest) + if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) { + t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime) + } + return ocResponse + } + coordinator.setHandler(handler) + + expected := int64(1) + pom.ResetOffset(expected, "modified_meta") + actual, meta := pom.NextOffset() + + if actual != expected { + t.Errorf("Expected offset %v. Actual: %v", expected, actual) + } + if meta != "modified_meta" { + t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta) + } + + safeClose(t, pom) + safeClose(t, om) + safeClose(t, testClient) + broker.Close() + coordinator.Close() +} + func TestPartitionOffsetManagerMarkOffset(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")