diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 43993fc632e2..0cd7a7084f08 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -207,7 +207,14 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts if err != nil { return err } - cr := newChunkReader() + + var cr *chunkReader + if req.Reversed { + cr = newReverseChunkReader() + } else { + cr = newChunkReader() + } + for { res, err := stream.Recv() if err == io.EOF { @@ -215,7 +222,11 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts } if err != nil { // Reset arg for next Invoke call. - arg = arg.retainRowsAfter(prevRowKey) + if req.Reversed { + arg = arg.retainRowsBefore(prevRowKey) + } else { + arg = arg.retainRowsAfter(prevRowKey) + } attrMap["rowKey"] = prevRowKey attrMap["error"] = err.Error() attrMap["time_secs"] = time.Since(startTime).Seconds() @@ -306,6 +317,10 @@ type RowSet interface { // given row key or any row key lexicographically less than it. retainRowsAfter(lastRowKey string) RowSet + // retainRowsBefore returns a new RowSet that does not include the + // given row key or any row key lexicographically greater than it. + retainRowsBefore(lastRowKey string) RowSet + // Valid reports whether this set can cover at least one row. valid() bool } @@ -331,53 +346,184 @@ func (r RowList) retainRowsAfter(lastRowKey string) RowSet { return retryKeys } +func (r RowList) retainRowsBefore(lastRowKey string) RowSet { + var retryKeys RowList + for _, key := range r { + if key < lastRowKey { + retryKeys = append(retryKeys, key) + } + } + return retryKeys +} + func (r RowList) valid() bool { return len(r) > 0 } -// A RowRange is a half-open interval [Start, Limit) encompassing -// all the rows with keys at least as large as Start, and less than Limit. -// (Bigtable string comparison is the same as Go's.) -// A RowRange can be unbounded, encompassing all keys at least as large as Start. +type rangeBoundType int64 + +const ( + rangeUnbounded rangeBoundType = iota + rangeOpen + rangeClosed +) + +// A RowRange describes a range of rows between the start and end key. Start and +// end keys may be rangeOpen, rangeClosed or rangeUnbounded. type RowRange struct { - start string - limit string + startBound rangeBoundType + start string + endBound rangeBoundType + end string } // NewRange returns the new RowRange [begin, end). func NewRange(begin, end string) RowRange { + return createRowRange(rangeClosed, begin, rangeOpen, end) +} + +// NewClosedOpenRange returns the RowRange consisting of all greater than or +// equal to the start and less than the end: [start, end). +func NewClosedOpenRange(start, end string) RowRange { + return createRowRange(rangeClosed, start, rangeOpen, end) +} + +// NewOpenClosedRange returns the RowRange consisting of all keys greater than +// the start and less than or equal to the end: (start, end]. +func NewOpenClosedRange(start, end string) RowRange { + return createRowRange(rangeOpen, start, rangeClosed, end) +} + +// NewOpenRange returns the RowRange consisting of all keys greater than the +// start and less than the end: (start, end). +func NewOpenRange(start, end string) RowRange { + return createRowRange(rangeOpen, start, rangeOpen, end) +} + +// NewClosedRange returns the RowRange consisting of all keys greater than or +// equal to the start and less than or equal to the end: [start, end]. +func NewClosedRange(start, end string) RowRange { + return createRowRange(rangeClosed, start, rangeClosed, end) +} + +// PrefixRange returns a RowRange consisting of all keys starting with the prefix. +func PrefixRange(prefix string) RowRange { + end := prefixSuccessor(prefix) + return createRowRange(rangeClosed, prefix, rangeOpen, end) +} + +// InfiniteRange returns the RowRange consisting of all keys at least as +// large as start: [start, ∞). +func InfiniteRange(start string) RowRange { + return createRowRange(rangeClosed, start, rangeUnbounded, "") +} + +// InfiniteReverseRange returns the RowRange consisting of all keys less than or +// equal to the end: (∞, end]. +func InfiniteReverseRange(end string) RowRange { + return createRowRange(rangeUnbounded, "", rangeClosed, end) +} + +// createRowRange creates a new RowRange, normalizing start and end +// rangeBoundType to rangeUnbounded if they're empty strings because empty +// strings also represent unbounded keys +func createRowRange(startBound rangeBoundType, start string, endBound rangeBoundType, end string) RowRange { + // normalize start bound type + if start == "" { + startBound = rangeUnbounded + } + // normalize end bound type + if end == "" { + endBound = rangeUnbounded + } return RowRange{ - start: begin, - limit: end, + startBound: startBound, + start: start, + endBound: endBound, + end: end, } } // Unbounded tests whether a RowRange is unbounded. func (r RowRange) Unbounded() bool { - return r.limit == "" + return r.startBound == rangeUnbounded || r.endBound == rangeUnbounded } // Contains says whether the RowRange contains the key. func (r RowRange) Contains(row string) bool { - return r.start <= row && (r.limit == "" || r.limit > row) + switch r.startBound { + case rangeOpen: + if r.start >= row { + return false + } + case rangeClosed: + if r.start > row { + return false + } + case rangeUnbounded: + } + + switch r.endBound { + case rangeOpen: + if r.end <= row { + return false + } + case rangeClosed: + if r.end < row { + return false + } + case rangeUnbounded: + } + + return true } // String provides a printable description of a RowRange. func (r RowRange) String() string { - a := strconv.Quote(r.start) - if r.Unbounded() { - return fmt.Sprintf("[%s,∞)", a) + var startStr string + switch r.startBound { + case rangeOpen: + startStr = "(" + strconv.Quote(r.start) + case rangeClosed: + startStr = "[" + strconv.Quote(r.start) + case rangeUnbounded: + startStr = "(∞" } - return fmt.Sprintf("[%s,%q)", a, r.limit) + + var endStr string + switch r.endBound { + case rangeOpen: + endStr = r.end + ")" + case rangeClosed: + endStr = r.end + "]" + case rangeUnbounded: + endStr = "∞)" + } + + return fmt.Sprintf("%s,%s", startStr, endStr) } func (r RowRange) proto() *btpb.RowSet { - rr := &btpb.RowRange{ - StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}, + rr := &btpb.RowRange{} + + switch r.startBound { + case rangeOpen: + rr.StartKey = &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte(r.start)} + case rangeClosed: + rr.StartKey = &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)} + case rangeUnbounded: + // leave unbounded } - if !r.Unbounded() { - rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)} + + switch r.endBound { + case rangeOpen: + rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.end)} + case rangeClosed: + rr.EndKey = &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte(r.end)} + case rangeUnbounded: + // leave unbounded } + return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}} } @@ -385,16 +531,45 @@ func (r RowRange) retainRowsAfter(lastRowKey string) RowSet { if lastRowKey == "" || lastRowKey < r.start { return r } - // Set the beginning of the range to the row after the last scanned. - start := lastRowKey + "\x00" - if r.Unbounded() { - return InfiniteRange(start) + + return RowRange{ + // Set the beginning of the range to the row after the last scanned. + startBound: rangeOpen, + start: lastRowKey, + endBound: r.endBound, + end: r.end, + } +} + +func (r RowRange) retainRowsBefore(lastRowKey string) RowSet { + if lastRowKey == "" || (r.endBound != rangeUnbounded && r.end < lastRowKey) { + return r + } + + return RowRange{ + startBound: r.startBound, + start: r.start, + endBound: rangeOpen, + end: lastRowKey, } - return NewRange(start, r.limit) } func (r RowRange) valid() bool { - return r.Unbounded() || r.start < r.limit + // If either end is unbounded, then the range is always valid. + if r.Unbounded() { + return true + } + + // If either end is an open interval, then the start must be strictly less + // than the end and since neither end is unbounded, we don't have to check + // for empty strings. + if r.startBound == rangeOpen || r.endBound == rangeOpen { + return r.start < r.end + } + + // At this point both endpoints must be closed, which makes [a,a] a valid + // interval + return r.start <= r.end } // RowRangeList is a sequence of RowRanges representing the union of the ranges. @@ -424,6 +599,21 @@ func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet { return ranges } +func (r RowRangeList) retainRowsBefore(lastRowKey string) RowSet { + if lastRowKey == "" { + return r + } + // Return a list of any range that has not yet been completely processed + var ranges RowRangeList + for _, rr := range r { + retained := rr.retainRowsBefore(lastRowKey) + if retained.valid() { + ranges = append(ranges, retained.(RowRange)) + } + } + return ranges +} + func (r RowRangeList) valid() bool { for _, rr := range r { if rr.valid() { @@ -438,23 +628,6 @@ func SingleRow(row string) RowSet { return RowList{row} } -// PrefixRange returns a RowRange consisting of all keys starting with the prefix. -func PrefixRange(prefix string) RowRange { - return RowRange{ - start: prefix, - limit: prefixSuccessor(prefix), - } -} - -// InfiniteRange returns the RowRange consisting of all keys at least as -// large as start. -func InfiniteRange(start string) RowRange { - return RowRange{ - start: start, - limit: "", - } -} - // prefixSuccessor returns the lexically smallest string greater than the // prefix, if it exists, or "" otherwise. In either case, it is the string // needed for the Limit of a RowRange. @@ -557,7 +730,7 @@ type rowFilter struct{ f Filter } func (rf rowFilter) set(settings *readSettings) { settings.req.Filter = rf.f.proto() } -// LimitRows returns a ReadOption that will limit the number of rows to be read. +// LimitRows returns a ReadOption that will end the number of rows to be read. func LimitRows(limit int64) ReadOption { return limitRows{limit} } type limitRows struct{ limit int64 } @@ -577,6 +750,25 @@ func (wrs withFullReadStats) set(settings *readSettings) { settings.fullReadStatsFunc = wrs.f } +// ReverseScan returns a RadOption that will reverse the results of a Scan. +// The rows will be streamed in reverse lexiographic order of the keys. The row key ranges of the RowSet are +// still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content +// will remain unchanged from the ordering forward scans. This is particularly useful to get the +// last N records before a key: +// +// table.ReadRows(ctx, NewOpenClosedRange("", "key"), func(row bigtable.Row) bool { +// return true +// }, bigtable.ReverseScan(), bigtable.LimitRows(10)) +func ReverseScan() ReadOption { + return reverseScan{} +} + +type reverseScan struct{} + +func (rs reverseScan) set(settings *readSettings) { + settings.req.Reversed = true +} + // mutationsAreRetryable returns true if all mutations are idempotent // and therefore retryable. A mutation is idempotent iff all cell timestamps // have an explicit timestamp set and do not rely on the timestamp being set on the server. diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 4fc0d13a3dfc..0bea28e72eef 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -18,6 +18,7 @@ package bigtable import ( "context" + "reflect" "testing" "time" @@ -42,11 +43,206 @@ func TestPrefix(t *testing.T) { continue } r := PrefixRange(test.prefix) - if test.succ == "" && r.limit != "" { - t.Errorf("PrefixRange(%q) got limit %q", test.prefix, r.limit) + if test.succ == "" && r.end != "" { + t.Errorf("PrefixRange(%q) got end %q", test.prefix, r.end) } - if test.succ != "" && r.limit != test.succ { - t.Errorf("PrefixRange(%q) got limit %q, want %q", test.prefix, r.limit, test.succ) + if test.succ != "" && r.end != test.succ { + t.Errorf("PrefixRange(%q) got end %q, want %q", test.prefix, r.end, test.succ) + } + } +} + +func TestNewClosedOpenRange(t *testing.T) { + start := "b" + limit := "b\x01" + r := NewClosedOpenRange(start, limit) + for _, test := range []struct { + k string + contains bool + }{ + {"a", false}, + {"b", true}, + {"b\x00", true}, + {"b\x01", false}, + } { + if want, got := test.contains, r.Contains(test.k); want != got { + t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want) + } + } + + for _, test := range []struct { + start, limit string + valid bool + }{ + {"a", "a", false}, + {"b", "a", false}, + {"a", "a\x00", true}, + {"a", "b", true}, + } { + r := NewClosedOpenRange(test.start, test.limit) + if want, got := test.valid, r.valid(); want != got { + t.Errorf("%s.valid() = %t, want %t", r.String(), got, want) + } + } +} +func TestNewOpenClosedRange(t *testing.T) { + start := "b" + limit := "b\x01" + r := NewOpenClosedRange(start, limit) + for _, test := range []struct { + k string + contains bool + }{ + {"a", false}, + {"b", false}, + {"b\x00", true}, + {"b\x01", true}, + {"b\x01\x00", false}, + } { + if want, got := test.contains, r.Contains(test.k); want != got { + t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want) + } + } + + for _, test := range []struct { + start, limit string + valid bool + }{ + {"a", "a", false}, + {"b", "a", false}, + {"a", "a\x00", true}, + {"a", "b", true}, + } { + r := NewOpenClosedRange(test.start, test.limit) + if want, got := test.valid, r.valid(); want != got { + t.Errorf("%s.valid() = %t, want %t", r.String(), got, want) + } + } +} +func TestNewClosedRange(t *testing.T) { + start := "b" + limit := "b" + + r := NewClosedRange(start, limit) + for _, test := range []struct { + k string + contains bool + }{ + {"a", false}, + {"b", true}, + {"b\x01", false}, + } { + if want, got := test.contains, r.Contains(test.k); want != got { + t.Errorf("NewClosedRange(%q, %q).Contains(%q) = %t, want %t", "a", "a\x01", test.k, got, test.contains) + } + } + + for _, test := range []struct { + start, limit string + valid bool + }{ + {"a", "b", true}, + {"b", "b", true}, + {"b", "b\x00", true}, + {"b\x00", "b", false}, + } { + r := NewClosedRange(test.start, test.limit) + if want, got := test.valid, r.valid(); want != got { + t.Errorf("NewClosedRange(%q, %q).valid() = %t, want %t", test.start, test.limit, got, want) + } + } +} + +func TestNewOpenRange(t *testing.T) { + start := "b" + limit := "b\x01" + + r := NewOpenRange(start, limit) + for _, test := range []struct { + k string + contains bool + }{ + {"a", false}, + {"b", false}, + {"b\x00", true}, + {"b\x01", false}, + } { + if want, got := test.contains, r.Contains(test.k); want != got { + t.Errorf("NewOpenRange(%q, %q).Contains(%q) = %t, want %t", "a", "a\x01", test.k, got, test.contains) + } + } + + for _, test := range []struct { + start, limit string + valid bool + }{ + {"a", "a", false}, + {"a", "b", true}, + {"a", "a\x00", true}, + {"a", "a\x01", true}, + } { + r := NewOpenRange(test.start, test.limit) + if want, got := test.valid, r.valid(); want != got { + t.Errorf("NewOpenRange(%q, %q).valid() = %t, want %t", test.start, test.limit, got, want) + } + } +} + +func TestInfiniteRange(t *testing.T) { + r := InfiniteRange("b") + for _, test := range []struct { + k string + contains bool + }{ + {"a", false}, + {"b", true}, + {"b\x00", true}, + {"z", true}, + } { + if want, got := test.contains, r.Contains(test.k); want != got { + t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want) + } + } + + for _, test := range []struct { + start string + valid bool + }{ + {"a", true}, + {"", true}, + } { + r := InfiniteRange(test.start) + if want, got := test.valid, r.valid(); want != got { + t.Errorf("%s.valid() = %t, want %t", r.String(), got, want) + } + } +} + +func TestInfiniteReverseRange(t *testing.T) { + r := InfiniteReverseRange("z") + for _, test := range []struct { + k string + contains bool + }{ + {"a", true}, + {"z", true}, + {"z\x00", false}, + } { + if want, got := test.contains, r.Contains(test.k); want != got { + t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want) + } + } + + for _, test := range []struct { + start string + valid bool + }{ + {"a", true}, + {"", true}, + } { + r := InfiniteReverseRange(test.start) + if want, got := test.valid, r.valid(); want != got { + t.Errorf("%s.valid() = %t, want %t", r.String(), got, want) } } } @@ -176,6 +372,188 @@ func requestCallback(callback func()) func(ctx context.Context, desc *grpc.Strea } } +func TestRowRangeProto(t *testing.T) { + + for _, test := range []struct { + desc string + rr RowRange + proto *btpb.RowSet + }{ + { + desc: "RowRange proto start and end", + rr: NewClosedOpenRange("a", "b"), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")}, + EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")}, + }}}, + }, + { + desc: "RowRange proto start but empty end", + rr: NewClosedOpenRange("a", ""), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")}, + }}}, + }, + { + desc: "RowRange proto unbound", + rr: NewClosedOpenRange("", ""), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{}}}, + }, + { + desc: "RowRange proto unbound with no start or end", + rr: InfiniteRange(""), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{}}}, + }, + { + desc: "RowRange proto open closed", + rr: NewOpenClosedRange("a", "b"), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + StartKey: &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte("a")}, + EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")}, + }}}, + }, + { + desc: "RowRange proto open closed and empty start", + rr: NewOpenClosedRange("", "b"), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")}, + }}}, + }, + { + desc: "RowRange proto open closed and empty start", + rr: NewOpenClosedRange("", "b"), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")}, + }}}, + }, + { + desc: "RowRange proto closed open", + rr: NewClosedOpenRange("a", "b"), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")}, + EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")}, + }}}, + }, + } { + t.Run(test.desc, func(t *testing.T) { + got := test.rr.proto() + want := test.proto + if !reflect.DeepEqual(got, want) { + t.Errorf("Bad proto for %s: got %v, want %v", test.rr.String(), got, want) + } + }) + } +} + +func TestRowRangeRetainRowsBefore(t *testing.T) { + for _, test := range []struct { + desc string + rr RowSet + proto *btpb.RowSet + }{ + { + desc: "retain rows before", + rr: NewRange("a", "c").retainRowsBefore("b"), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")}, + EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")}, + }}}, + }, + { + desc: "retain rows before empty key", + rr: NewRange("a", "c").retainRowsBefore(""), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")}, + EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("c")}, + }}}, + }, + { + desc: "retain rows before key greater than range end", + rr: NewClosedRange("a", "c").retainRowsBefore("d"), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")}, + EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("c")}, + }}}, + }, + { + desc: "retain rows before key same as closed end key", + rr: NewClosedRange("a", "c").retainRowsBefore("c"), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")}, + EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("c")}, + }}}, + }, + { + desc: "retain rows before on unbounded range", + rr: InfiniteRange("").retainRowsBefore("z"), + proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{ + EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("z")}, + }}}, + }, + } { + t.Run(test.desc, func(t *testing.T) { + got := test.rr.proto() + want := test.proto + if !reflect.DeepEqual(got, want) { + t.Errorf("Bad retain rows before proto: got %v, want %v", got, want) + } + }) + } +} + +func TestRowRangeString(t *testing.T) { + + for _, test := range []struct { + desc string + rr RowRange + str string + }{ + { + desc: "RowRange closed open", + rr: NewClosedOpenRange("a", "b"), + str: "[\"a\",b)", + }, + { + desc: "RowRange open open", + rr: NewOpenRange("c", "d"), + str: "(\"c\",d)", + }, + { + desc: "RowRange closed closed", + rr: NewClosedRange("e", "f"), + str: "[\"e\",f]", + }, + { + desc: "RowRange open closed", + rr: NewOpenClosedRange("g", "h"), + str: "(\"g\",h]", + }, + { + desc: "RowRange unbound unbound", + rr: InfiniteRange(""), + str: "(∞,∞)", + }, + { + desc: "RowRange closed unbound", + rr: InfiniteRange("b"), + str: "[\"b\",∞)", + }, + { + desc: "RowRange unbound closed", + rr: InfiniteReverseRange("c"), + str: "(∞,c]", + }, + } { + t.Run(test.desc, func(t *testing.T) { + got := test.rr.String() + want := test.str + if !reflect.DeepEqual(got, want) { + t.Errorf("Bad String(): got %v, want %v", got, want) + } + }) + } +} + // TestReadRowsInvalidRowSet verifies that the client doesn't send ReadRows() requests with invalid RowSets. func TestReadRowsInvalidRowSet(t *testing.T) { testEnv, err := NewEmulatedEnv(IntegrationTestConfig{}) @@ -212,19 +590,19 @@ func TestReadRowsInvalidRowSet(t *testing.T) { valid bool }{ { - rr: RowRange{}, + rr: RowRange{startBound: rangeUnbounded, endBound: rangeUnbounded}, valid: true, }, { - rr: RowRange{start: "b"}, + rr: RowRange{startBound: rangeClosed, start: "b", endBound: rangeUnbounded}, valid: true, }, { - rr: RowRange{start: "b", limit: "c"}, + rr: RowRange{startBound: rangeClosed, start: "b", endBound: rangeOpen, end: "c"}, valid: true, }, { - rr: RowRange{start: "b", limit: "a"}, + rr: RowRange{startBound: rangeClosed, start: "b", endBound: rangeOpen, end: "a"}, valid: false, }, { @@ -307,7 +685,7 @@ func TestReadRowsRequestStats(t *testing.T) { statsChannel := make(chan FullReadStats, 1) readStart := time.Now() - if err := table.ReadRows(ctx, RowRange{}, func(r Row) bool { return true }, WithFullReadStats(func(s *FullReadStats) { statsChannel <- *s }), RowFilter(ColumnFilter("q.*"))); err != nil { + if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { return true }, WithFullReadStats(func(s *FullReadStats) { statsChannel <- *s }), RowFilter(ColumnFilter("q.*"))); err != nil { t.Fatalf("NewClient failed: %v", err) } readElapsed := time.Since(readStart) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 5b49072c647c..17228262ff38 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -208,7 +208,7 @@ func TestIntegration_PartialReadRows(t *testing.T) { // Do a scan and stop part way through. // Verify that the ReadRows callback doesn't keep running. stopped := false - err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { + err = table.ReadRows(ctx, RowRange{}, func(r Row) bool { if r.Key() < "h" { return true } @@ -256,6 +256,39 @@ func TestIntegration_ReadRowList(t *testing.T) { t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) } } +func TestIntegration_ReadRowListReverse(t *testing.T) { + ctx := context.Background() + _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) + if err != nil { + t.Fatal(err) + } + defer cleanup() + + if err := populatePresidentsGraph(table); err != nil { + t.Fatal(err) + } + + // Read a RowList + var elt []string + rowRange := NewOpenClosedRange("gwashington", "wmckinley") + want := "wmckinley-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1" + err = table.ReadRows(ctx, rowRange, func(r Row) bool { + for _, ris := range r { + for _, ri := range ris { + elt = append(elt, formatReadItem(ri)) + } + } + return true + }, ReverseScan()) + + if err != nil { + t.Fatalf("read RowList: %v", err) + } + + if got := strings.Join(elt, ","); got != want { + t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) + } +} func TestIntegration_DeleteRow(t *testing.T) { ctx := context.Background() @@ -424,7 +457,7 @@ func TestIntegration_ArbitraryTimestamps(t *testing.T) { if !testutil.Equal(r, wantRow) { t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) } - // Check cell offset / limit + // Check cell offset / end r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) if err != nil { t.Fatalf("Reading row: %v", err) @@ -778,7 +811,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { } verifyDirectPathRemoteAddress(testEnv, t) if rc != wantRc { - t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) + t.Fatalf("Scan with row end returned %d rows, want %d", rc, wantRc) } // Test bulk mutations @@ -975,7 +1008,7 @@ func TestIntegration_Read(t *testing.T) { want: "", }, { - desc: "read with ColumnFilter + row limit", + desc: "read with ColumnFilter + row end", rr: RowRange{}, filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson" limit: LimitRows(2), @@ -996,7 +1029,7 @@ func TestIntegration_Read(t *testing.T) { want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-", }, { - desc: "read with ColumnFilter + row limit + strip values", + desc: "read with ColumnFilter + row end + strip values", rr: RowRange{}, filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson" limit: LimitRows(2), @@ -1015,7 +1048,7 @@ func TestIntegration_Read(t *testing.T) { want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-", }, { - desc: "read with ValueRangeFilter + row limit", + desc: "read with ValueRangeFilter + row end", rr: RowRange{}, filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" limit: LimitRows(2), @@ -1128,10 +1161,11 @@ func TestIntegration_FullReadStats(t *testing.T) { } for _, test := range []struct { - desc string - rr RowSet - filter Filter // may be nil - limit ReadOption // may be nil + desc string + rr RowSet + filter Filter // may be nil + limit ReadOption // may be nil + reverseScan bool // We do the read and grab all the stats. cellsReturnedCount int64 @@ -1251,7 +1285,7 @@ func TestIntegration_FullReadStats(t *testing.T) { rowsReturnedCount: 0, }, { - desc: "read with ColumnFilter + row limit", + desc: "read with ColumnFilter + row end", rr: RowRange{}, filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson" limit: LimitRows(2), @@ -1274,7 +1308,7 @@ func TestIntegration_FullReadStats(t *testing.T) { rowsReturnedCount: 4, }, { - desc: "read with ColumnFilter + row limit + strip values", + desc: "read with ColumnFilter + row end + strip values", rr: RowRange{}, filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson" limit: LimitRows(2), @@ -1296,7 +1330,7 @@ func TestIntegration_FullReadStats(t *testing.T) { rowsReturnedCount: 4, }, { - desc: "read with ValueRangeFilter + row limit", + desc: "read with ValueRangeFilter + row end", rr: RowRange{}, filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" limit: LimitRows(2), @@ -1358,6 +1392,20 @@ func TestIntegration_FullReadStats(t *testing.T) { cellsReturnedCount: 0, rowsReturnedCount: 0, }, + { + desc: "reverse read all, unfiltered", + rr: RowRange{}, + reverseScan: true, + cellsReturnedCount: 7, + rowsReturnedCount: 4, + }, + { + desc: "reverse read with InfiniteRange, unfiltered", + rr: InfiniteReverseRange("wmckinley"), + reverseScan: true, + cellsReturnedCount: 7, + rowsReturnedCount: 4, + }, } { t.Run(test.desc, func(t *testing.T) { var opts []ReadOption @@ -1367,6 +1415,9 @@ func TestIntegration_FullReadStats(t *testing.T) { if test.limit != nil { opts = append(opts, test.limit) } + if test.reverseScan { + opts = append(opts, ReverseScan()) + } // Define a callback for validating request stats. callbackInvoked := false statsValidator := WithFullReadStats( @@ -1387,7 +1438,7 @@ func TestIntegration_FullReadStats(t *testing.T) { // We use lenient checks for CellsSeenCount and RowsSeenCount. Exact checks would be brittle. // Note that the emulator and prod sometimes yield different values: // - Sometimes prod scans fewer cells due to optimizations that allow prod to skip cells. - // - Sometimes prod scans more cells due to to filters that must rescan cells. + // - Sometimes prod scans more cells due to filters that must rescan cells. // Similar issues apply for RowsSeenCount. if got, want := readStats.CellsSeenCount, readStats.CellsReturnedCount; got < want { t.Errorf("CellsSeenCount should be greater than or equal to CellsReturnedCount. got: %d < want: %d", @@ -2119,7 +2170,7 @@ func TestIntegration_AdminEncryptionInfo(t *testing.T) { time.Sleep(time.Second * 10) } if encryptionKeyVersion == "" { - t.Fatalf("Encryption Key not created within alotted time limit") + t.Fatalf("Encryption Key not created within alotted time end") } // Validate Encryption Info under getTable diff --git a/bigtable/reader.go b/bigtable/reader.go index 64aabc91964f..8f0a4c1cd21c 100644 --- a/bigtable/reader.go +++ b/bigtable/reader.go @@ -19,6 +19,7 @@ package bigtable import ( "bytes" "fmt" + "strings" btpb "google.golang.org/genproto/googleapis/bigtable/v2" ) @@ -58,6 +59,7 @@ const ( // chunkReader handles cell chunks from the read rows response and combines // them into full Rows. type chunkReader struct { + reversed bool state rrState curKey []byte curLabels []string @@ -71,7 +73,11 @@ type chunkReader struct { // newChunkReader returns a new chunkReader for handling read rows responses. func newChunkReader() *chunkReader { - return &chunkReader{state: newRow} + return &chunkReader{reversed: false, state: newRow} +} + +func newReverseChunkReader() *chunkReader { + return &chunkReader{reversed: true, state: newRow} } // Process takes a cell chunk and returns a new Row if the given chunk @@ -200,9 +206,19 @@ func (cr *chunkReader) validateNewRow(cc *btpb.ReadRowsResponse_CellChunk) error if cc.RowKey == nil || cc.FamilyName == nil || cc.Qualifier == nil { return fmt.Errorf("missing key field for new row %v", cc) } - if cr.lastKey != "" && cr.lastKey >= string(cc.RowKey) { - return fmt.Errorf("out of order row key: %q, %q", cr.lastKey, string(cc.RowKey)) + + if cr.lastKey != "" { + r := strings.Compare(string(cc.RowKey), cr.lastKey) + direction := "increasing" + if cr.reversed { + r *= -1 + direction = "decreasing" + } + if r <= 0 { + return fmt.Errorf("out of order row key, must be strictly %s. new key: %q prev row: %q", direction, cc.RowKey, cr.lastKey) + } } + return nil } diff --git a/bigtable/retry_test.go b/bigtable/retry_test.go index 69e6fda42cc0..3d549aef5a15 100644 --- a/bigtable/retry_test.go +++ b/bigtable/retry_test.go @@ -345,7 +345,7 @@ func writeMutateRowsResponse(ss grpc.ServerStream, codes ...codes.Code) error { func TestRetainRowsAfter(t *testing.T) { prevRowRange := NewRange("a", "z") prevRowKey := "m" - want := NewRange("m\x00", "z") + want := NewOpenRange("m", "z") got := prevRowRange.retainRowsAfter(prevRowKey) if !testutil.Equal(want, got, cmp.AllowUnexported(RowRange{})) { t.Errorf("range retry: got %v, want %v", got, want) @@ -353,7 +353,7 @@ func TestRetainRowsAfter(t *testing.T) { prevRowRangeList := RowRangeList{NewRange("a", "d"), NewRange("e", "g"), NewRange("h", "l")} prevRowKey = "f" - wantRowRangeList := RowRangeList{NewRange("f\x00", "g"), NewRange("h", "l")} + wantRowRangeList := RowRangeList{NewOpenRange("f", "g"), NewRange("h", "l")} got = prevRowRangeList.retainRowsAfter(prevRowKey) if !testutil.Equal(wantRowRangeList, got, cmp.AllowUnexported(RowRange{})) { t.Errorf("range list retry: got %v, want %v", got, wantRowRangeList) @@ -406,7 +406,7 @@ func TestRetryReadRows(t *testing.T) { err = status.Errorf(codes.Unavailable, "") case 2: // Retryable request failure - if want, got := "b\x00", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got { + if want, got := "b", string(req.Rows.RowRanges[0].GetStartKeyOpen()); want != got { t.Errorf("2 range retries: got %q, want %q", got, want) } err = status.Errorf(codes.Unavailable, "") @@ -418,7 +418,7 @@ func TestRetryReadRows(t *testing.T) { must(ss.SendMsg(&btpb.ReadRowsResponse{LastScannedRowKey: []byte("e")})) err = status.Errorf(codes.Unavailable, "") case 5: - if want, got := "e\x00", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got { + if want, got := "e", string(req.Rows.RowRanges[0].GetStartKeyOpen()); want != got { t.Errorf("3 range retries: got %q, want %q", got, want) } must(writeReadRowsResponse(ss, "f", "g")) @@ -439,6 +439,80 @@ func TestRetryReadRows(t *testing.T) { } } +func TestRetryReverseReadRows(t *testing.T) { + ctx := context.Background() + + // Intercept requests and delegate to an interceptor defined by the test case + errCount := 0 + var f func(grpc.ServerStream) error + errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if strings.HasSuffix(info.FullMethod, "ReadRows") { + return f(ss) + } + return handler(ctx, ss) + } + + tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) + defer cleanup() + if err != nil { + t.Fatalf("fake server setup: %v", err) + } + + errCount = 0 + // Test overall request failure and retries + f = func(ss grpc.ServerStream) error { + var err error + req := new(btpb.ReadRowsRequest) + must(ss.RecvMsg(req)) + switch errCount { + case 0: + // Retryable request failure + err = status.Errorf(codes.Unavailable, "") + case 1: + // Write two rows then error + if want, got := "z", string(req.Rows.RowRanges[0].GetEndKeyClosed()); want != got { + t.Errorf("first retry, no data received yet: got %q, want %q", got, want) + } + must(writeReadRowsResponse(ss, "g", "f")) + err = status.Errorf(codes.Unavailable, "") + case 2: + // Retryable request failure + if want, got := "f", string(req.Rows.RowRanges[0].GetEndKeyOpen()); want != got { + t.Errorf("2 range retries: got %q, want %q", got, want) + } + err = status.Errorf(codes.Unavailable, "") + case 3: + must(ss.SendMsg(&btpb.ReadRowsResponse{LastScannedRowKey: []byte("e")})) + err = status.Errorf(codes.Unavailable, "") + case 4: + if want, got := "e", string(req.Rows.RowRanges[0].GetEndKeyOpen()); want != got { + t.Errorf("3 range retries: got %q, want %q", got, want) + } + // Write two more rows + must(writeReadRowsResponse(ss, "d", "c")) + err = status.Errorf(codes.Unavailable, "") + case 5: + if want, got := "c", string(req.Rows.RowRanges[0].GetEndKeyOpen()); want != got { + t.Errorf("3 range retries: got %q, want %q", got, want) + } + must(writeReadRowsResponse(ss, "b", "a")) + err = nil + } + errCount++ + return err + } + + var got []string + must(tbl.ReadRows(ctx, NewClosedRange("a", "z"), func(r Row) bool { + got = append(got, r.Key()) + return true + }, ReverseScan())) + want := []string{"g", "f", "d", "c", "b", "a"} + if !testutil.Equal(got, want) { + t.Errorf("retry range integration: got %v, want %v", got, want) + } +} + func writeReadRowsResponse(ss grpc.ServerStream, rowKeys ...string) error { var chunks []*btpb.ReadRowsResponse_CellChunk for _, key := range rowKeys {