Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds ResetOffset to reset to earlier offset values. #932

Merged
merged 1 commit into from
Sep 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ type PartitionOffsetManager interface {
// message twice, and your processing should ideally be idempotent.
MarkOffset(offset int64, metadata string)

// ResetOffset resets to the provided offset, alongside a metadata string that
// represents the state of the partition consumer at that point in time. Reset
// acts as a counterpart to MarkOffset, the difference being that it allows to
// reset an offset to an earlier or smaller value, where MarkOffset only
// allows incrementing the offset. cf MarkOffset for more details.
ResetOffset(offset int64, metadata string)

// Errors returns a read channel of errors that occur during offset management, if
// enabled. By default, errors are logged and not returned over this channel. If
// you want to implement any custom error handling, set your config's
Expand Down Expand Up @@ -329,6 +336,17 @@ func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
}
}

func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
pom.lock.Lock()
defer pom.lock.Unlock()

if offset < pom.offset {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eapache @fgeller sorry I'm late to the party on this, just now getting around to implement our usage of this feature.

Any reason for this condition? this is preventing me from updating the metadata without changing the offset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating the metadata without changing the offset.

I don't think either of us ever considered a use case for that. I suppose it would be easy enough to make this a <=

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pom.offset = offset
pom.metadata = metadata
pom.dirty = true
}
}

func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
pom.lock.Lock()
defer pom.lock.Unlock()
Expand Down
64 changes: 64 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,70 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {
safeClose(t, testClient)
}

func TestPartitionOffsetManagerResetOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
coordinator.Returns(ocResponse)

expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
actual, meta := pom.NextOffset()

if actual != expected {
t.Errorf("Expected offset %v. Actual: %v", expected, actual)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}

safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
testClient.Config().Consumer.Offsets.Retention = time.Hour

pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
handler := func(req *request) (res encoder) {
if req.body.version() != 2 {
t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
}
offsetCommitRequest := req.body.(*OffsetCommitRequest)
if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
}
return ocResponse
}
coordinator.setHandler(handler)

expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
actual, meta := pom.NextOffset()

if actual != expected {
t.Errorf("Expected offset %v. Actual: %v", expected, actual)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}

safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
Expand Down