Skip to content

Commit

Permalink
feat(bigtable): Add support for reverse scans (#8755)
Browse files Browse the repository at this point in the history
* feat: add more expressive range api

* feat(bigtable): Add support for reverse scans

* resolve feature flag conflict after merge

* add a new constructor for chunkReader to avoid line noise

* remove orphaned code

* fix typo

* adding a first test

* extended RowRange to express bound types

* adding unit tests

* correcting backwards compatability behavior

* adding test proxy support for reverse scan

* added todo reminder

* all unit tests pass

* updated naming

* fix retries on reverse scan

* more tests

* exposing client messages to test proxy
formatting

* exposing client messages to test proxy
formatting

* fixing vet errors

* minor style tweaks

* changing error message to be consistent with java and cpp

* cleaning up code and adding tests

* simplify RowRange valid logic

* rolling back test proxy changes

* rolling back test proxy

* changed default bound type for better backwards compatability
rolled back some integration test changes to ensure backwards compatability

---------

Co-authored-by: Igor Berntein <igorbernstein@google.com>
  • Loading branch information
brandtnewton and igorbernstein2 committed Nov 8, 2023
1 parent 3e23a36 commit 244d135
Show file tree
Hide file tree
Showing 5 changed files with 786 additions and 75 deletions.
280 changes: 236 additions & 44 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,26 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
if err != nil {
return err
}
cr := newChunkReader()

var cr *chunkReader
if req.Reversed {
cr = newReverseChunkReader()
} else {
cr = newChunkReader()
}

for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
// Reset arg for next Invoke call.
arg = arg.retainRowsAfter(prevRowKey)
if req.Reversed {
arg = arg.retainRowsBefore(prevRowKey)
} else {
arg = arg.retainRowsAfter(prevRowKey)
}
attrMap["rowKey"] = prevRowKey
attrMap["error"] = err.Error()
attrMap["time_secs"] = time.Since(startTime).Seconds()
Expand Down Expand Up @@ -306,6 +317,10 @@ type RowSet interface {
// given row key or any row key lexicographically less than it.
retainRowsAfter(lastRowKey string) RowSet

// retainRowsBefore returns a new RowSet that does not include the
// given row key or any row key lexicographically greater than it.
retainRowsBefore(lastRowKey string) RowSet

// Valid reports whether this set can cover at least one row.
valid() bool
}
Expand All @@ -331,70 +346,230 @@ func (r RowList) retainRowsAfter(lastRowKey string) RowSet {
return retryKeys
}

func (r RowList) retainRowsBefore(lastRowKey string) RowSet {
var retryKeys RowList
for _, key := range r {
if key < lastRowKey {
retryKeys = append(retryKeys, key)
}
}
return retryKeys
}

func (r RowList) valid() bool {
return len(r) > 0
}

// A RowRange is a half-open interval [Start, Limit) encompassing
// all the rows with keys at least as large as Start, and less than Limit.
// (Bigtable string comparison is the same as Go's.)
// A RowRange can be unbounded, encompassing all keys at least as large as Start.
type rangeBoundType int64

const (
rangeUnbounded rangeBoundType = iota
rangeOpen
rangeClosed
)

// A RowRange describes a range of rows between the start and end key. Start and
// end keys may be rangeOpen, rangeClosed or rangeUnbounded.
type RowRange struct {
start string
limit string
startBound rangeBoundType
start string
endBound rangeBoundType
end string
}

// NewRange returns the new RowRange [begin, end).
func NewRange(begin, end string) RowRange {
return createRowRange(rangeClosed, begin, rangeOpen, end)
}

// NewClosedOpenRange returns the RowRange consisting of all greater than or
// equal to the start and less than the end: [start, end).
func NewClosedOpenRange(start, end string) RowRange {
return createRowRange(rangeClosed, start, rangeOpen, end)
}

// NewOpenClosedRange returns the RowRange consisting of all keys greater than
// the start and less than or equal to the end: (start, end].
func NewOpenClosedRange(start, end string) RowRange {
return createRowRange(rangeOpen, start, rangeClosed, end)
}

// NewOpenRange returns the RowRange consisting of all keys greater than the
// start and less than the end: (start, end).
func NewOpenRange(start, end string) RowRange {
return createRowRange(rangeOpen, start, rangeOpen, end)
}

// NewClosedRange returns the RowRange consisting of all keys greater than or
// equal to the start and less than or equal to the end: [start, end].
func NewClosedRange(start, end string) RowRange {
return createRowRange(rangeClosed, start, rangeClosed, end)
}

// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
func PrefixRange(prefix string) RowRange {
end := prefixSuccessor(prefix)
return createRowRange(rangeClosed, prefix, rangeOpen, end)
}

// InfiniteRange returns the RowRange consisting of all keys at least as
// large as start: [start, ∞).
func InfiniteRange(start string) RowRange {
return createRowRange(rangeClosed, start, rangeUnbounded, "")
}

// InfiniteReverseRange returns the RowRange consisting of all keys less than or
// equal to the end: (∞, end].
func InfiniteReverseRange(end string) RowRange {
return createRowRange(rangeUnbounded, "", rangeClosed, end)
}

// createRowRange creates a new RowRange, normalizing start and end
// rangeBoundType to rangeUnbounded if they're empty strings because empty
// strings also represent unbounded keys
func createRowRange(startBound rangeBoundType, start string, endBound rangeBoundType, end string) RowRange {
// normalize start bound type
if start == "" {
startBound = rangeUnbounded
}
// normalize end bound type
if end == "" {
endBound = rangeUnbounded
}
return RowRange{
start: begin,
limit: end,
startBound: startBound,
start: start,
endBound: endBound,
end: end,
}
}

// Unbounded tests whether a RowRange is unbounded.
func (r RowRange) Unbounded() bool {
return r.limit == ""
return r.startBound == rangeUnbounded || r.endBound == rangeUnbounded
}

// Contains says whether the RowRange contains the key.
func (r RowRange) Contains(row string) bool {
return r.start <= row && (r.limit == "" || r.limit > row)
switch r.startBound {
case rangeOpen:
if r.start >= row {
return false
}
case rangeClosed:
if r.start > row {
return false
}
case rangeUnbounded:
}

switch r.endBound {
case rangeOpen:
if r.end <= row {
return false
}
case rangeClosed:
if r.end < row {
return false
}
case rangeUnbounded:
}

return true
}

// String provides a printable description of a RowRange.
func (r RowRange) String() string {
a := strconv.Quote(r.start)
if r.Unbounded() {
return fmt.Sprintf("[%s,∞)", a)
var startStr string
switch r.startBound {
case rangeOpen:
startStr = "(" + strconv.Quote(r.start)
case rangeClosed:
startStr = "[" + strconv.Quote(r.start)
case rangeUnbounded:
startStr = "(∞"
}
return fmt.Sprintf("[%s,%q)", a, r.limit)

var endStr string
switch r.endBound {
case rangeOpen:
endStr = r.end + ")"
case rangeClosed:
endStr = r.end + "]"
case rangeUnbounded:
endStr = "∞)"
}

return fmt.Sprintf("%s,%s", startStr, endStr)
}

func (r RowRange) proto() *btpb.RowSet {
rr := &btpb.RowRange{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
rr := &btpb.RowRange{}

switch r.startBound {
case rangeOpen:
rr.StartKey = &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte(r.start)}
case rangeClosed:
rr.StartKey = &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}
case rangeUnbounded:
// leave unbounded
}
if !r.Unbounded() {
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}

switch r.endBound {
case rangeOpen:
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.end)}
case rangeClosed:
rr.EndKey = &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte(r.end)}
case rangeUnbounded:
// leave unbounded
}

return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}
}

func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
if lastRowKey == "" || lastRowKey < r.start {
return r
}
// Set the beginning of the range to the row after the last scanned.
start := lastRowKey + "\x00"
if r.Unbounded() {
return InfiniteRange(start)

return RowRange{
// Set the beginning of the range to the row after the last scanned.
startBound: rangeOpen,
start: lastRowKey,
endBound: r.endBound,
end: r.end,
}
}

func (r RowRange) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" || (r.endBound != rangeUnbounded && r.end < lastRowKey) {
return r
}

return RowRange{
startBound: r.startBound,
start: r.start,
endBound: rangeOpen,
end: lastRowKey,
}
return NewRange(start, r.limit)
}

func (r RowRange) valid() bool {
return r.Unbounded() || r.start < r.limit
// If either end is unbounded, then the range is always valid.
if r.Unbounded() {
return true
}

// If either end is an open interval, then the start must be strictly less
// than the end and since neither end is unbounded, we don't have to check
// for empty strings.
if r.startBound == rangeOpen || r.endBound == rangeOpen {
return r.start < r.end
}

// At this point both endpoints must be closed, which makes [a,a] a valid
// interval
return r.start <= r.end
}

// RowRangeList is a sequence of RowRanges representing the union of the ranges.
Expand Down Expand Up @@ -424,6 +599,21 @@ func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
return ranges
}

func (r RowRangeList) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" {
return r
}
// Return a list of any range that has not yet been completely processed
var ranges RowRangeList
for _, rr := range r {
retained := rr.retainRowsBefore(lastRowKey)
if retained.valid() {
ranges = append(ranges, retained.(RowRange))
}
}
return ranges
}

func (r RowRangeList) valid() bool {
for _, rr := range r {
if rr.valid() {
Expand All @@ -438,23 +628,6 @@ func SingleRow(row string) RowSet {
return RowList{row}
}

// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
func PrefixRange(prefix string) RowRange {
return RowRange{
start: prefix,
limit: prefixSuccessor(prefix),
}
}

// InfiniteRange returns the RowRange consisting of all keys at least as
// large as start.
func InfiniteRange(start string) RowRange {
return RowRange{
start: start,
limit: "",
}
}

// prefixSuccessor returns the lexically smallest string greater than the
// prefix, if it exists, or "" otherwise. In either case, it is the string
// needed for the Limit of a RowRange.
Expand Down Expand Up @@ -557,7 +730,7 @@ type rowFilter struct{ f Filter }

func (rf rowFilter) set(settings *readSettings) { settings.req.Filter = rf.f.proto() }

// LimitRows returns a ReadOption that will limit the number of rows to be read.
// LimitRows returns a ReadOption that will end the number of rows to be read.
func LimitRows(limit int64) ReadOption { return limitRows{limit} }

type limitRows struct{ limit int64 }
Expand All @@ -577,6 +750,25 @@ func (wrs withFullReadStats) set(settings *readSettings) {
settings.fullReadStatsFunc = wrs.f
}

// ReverseScan returns a RadOption that will reverse the results of a Scan.
// The rows will be streamed in reverse lexiographic order of the keys. The row key ranges of the RowSet are
// still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
// will remain unchanged from the ordering forward scans. This is particularly useful to get the
// last N records before a key:
//
// table.ReadRows(ctx, NewOpenClosedRange("", "key"), func(row bigtable.Row) bool {
// return true
// }, bigtable.ReverseScan(), bigtable.LimitRows(10))
func ReverseScan() ReadOption {
return reverseScan{}
}

type reverseScan struct{}

func (rs reverseScan) set(settings *readSettings) {
settings.req.Reversed = true
}

// mutationsAreRetryable returns true if all mutations are idempotent
// and therefore retryable. A mutation is idempotent iff all cell timestamps
// have an explicit timestamp set and do not rely on the timestamp being set on the server.
Expand Down
Loading

0 comments on commit 244d135

Please sign in to comment.