Skip to content

Commit

Permalink
Merge pull request #9667 from twmb/rpk_consume
Browse files Browse the repository at this point in the history
rpk topic consume: offset parsing fixups
  • Loading branch information
twmb authored Mar 28, 2023
2 parents b33b122 + 2e32baa commit 8479d7d
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions src/go/rpk/pkg/cli/cmd/topic/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,9 @@ func (c *consumer) parseTimeOffset(
// the default offset -1, which below in NewOffset().At(-1)
// actually coincidentally maps to AtEnd(). So it all works.
lstart, err := adm.ListOffsetsAfterMilli(context.Background(), unixMilli(startAt.UnixNano()), topics...)
if err == nil {
err = lstart.Error()
}
if err != nil {
return fmt.Errorf("unable to list offsets after milli %d: %v", unixMilli(startAt.UnixNano()), err)
}
Expand All @@ -537,6 +540,12 @@ func (c *consumer) parseTimeOffset(
if offset == "" || offset == ":" { // requesting from a time onward; e.g., @1m:
return nil
}
} else {
lstart, err := adm.ListStartOffsets(context.Background(), topics...)
if err != nil {
return fmt.Errorf("unable to list start offsets: %v", err)
}
c.setParts(&c.partStarts, lstart, nil)
}
offset = offset[1:] // strip start:end delimiting colon

Expand All @@ -553,11 +562,19 @@ func (c *consumer) parseTimeOffset(
}
var lend kadm.ListedOffsets
if end {
if lend, err = adm.ListEndOffsets(context.Background(), topics...); err != nil {
lend, err = adm.ListEndOffsets(context.Background(), topics...)
if err == nil {
err = lend.Error()
}
if err != nil {
return fmt.Errorf("unable to list end offsets: %v", err)
}
} else {
if lend, err = adm.ListOffsetsAfterMilli(context.Background(), unixMilli(endAt.UnixNano()), topics...); err != nil {
lend, err = adm.ListOffsetsAfterMilli(context.Background(), unixMilli(endAt.UnixNano()), topics...)
if err == nil {
err = lend.Error()
}
if err != nil {
return fmt.Errorf("unable to list offsets after milli %d: %v", unixMilli(endAt.UnixNano()), err)
}
}
Expand All @@ -568,6 +585,9 @@ func (c *consumer) parseTimeOffset(
// partitions.
if c.partEnds == nil {
lstart, err := adm.ListStartOffsets(context.Background(), topics...)
if err == nil {
err = lstart.Error()
}
if err != nil {
return fmt.Errorf("unable to list start offsets: %v", err)
}
Expand Down Expand Up @@ -955,9 +975,14 @@ until one succeeds:
9 digits parsed as a unix second
YYYY-MM-DD parsed as a day, UTC
YYYY-MM-DDTHH:MM:SSZ parsed as RFC3339, UTC; fractional seconds optional (.MMM)
-dur duration ago; from now (as t1) or from t1 (as t2)
dur for t2 in @t1:t2, relative duration from t1
end for t2 in @t1:t2, the current end of the partition
-dur a negative duration from now or from a timestamp
dur a positive duration from now or from a timestamp
Durations can be relative to the current time or relative to a timestamp.
If a duration is used for t1, that duration is relative to now.
If a duration is used for t2, if t1 is a timestamp, then t2 is relative to t1.
If a duration is used for t2, if t1 is a duration, then t2 is relative to now.
Durations are parsed simply:
Expand Down

0 comments on commit 8479d7d

Please sign in to comment.