Skip to content

Commit

Permalink
bigquery: add RowIterator, to replace Iterator
Browse files Browse the repository at this point in the history
This CL introduces RowIterator, which follows the standard iterator
pattern. It also contains the replacements for Job.Read and Table.Read
that will return it, but makes no breaking changes.

The main surface change is that ReadOptions will go away:

- RecordsPerRequest becomes PageInfo().MaxSize.

- There never was a PageToken read option. Now PageInfo().Token can be
  set as usual.

- It no longer makes sense to have a single StartIndex option, when the
  other two are fields. So StartIndex is a field on RowIterator.

The implementation is a bit clumsy, out of a desire to leave the
existing XXXConf types and service methods mostly unchanged. After the
old iterator is removed, we should be able to simplify.

Change-Id: Ief1fa93dd93ce4cd8006c8c9cd14ec5f3559e8b0
Reviewed-on: https://code-review.googlesource.com/7612
Reviewed-by: Michael McGreevy <mcgreevy@golang.org>
  • Loading branch information
jba committed Oct 13, 2016
1 parent 5d33b1b commit b8c1696
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 1 deletion.
98 changes: 98 additions & 0 deletions bigquery/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"fmt"

"golang.org/x/net/context"
"google.golang.org/api/iterator"
)

// A pageFetcher returns a page of rows, starting from the row specified by token.
type pageFetcher interface {
fetch(ctx context.Context, s service, token string) (*readDataResult, error)
setPaging(*pagingConf)
}

// Iterator provides access to the result of a BigQuery lookup.
Expand Down Expand Up @@ -184,3 +186,99 @@ func (it *Iterator) Schema() (Schema, error) {

return it.schema, nil
}

////////////////////////////////////////////////////////////////
// New iterator implementation: will replace the old

// TODO(jba) replace Job.Read with Job.readRows.
func (j *Job) readRows(ctx context.Context) (*RowIterator, error) {
conf := &readQueryConf{}
if err := j.customizeReadQuery(conf); err != nil {
return nil, err
}
return newRowIterator(ctx, j.service, conf), nil
}

// TODO(jba) replace Table.Read with Table.readRows.
// Note: no error return.
func (t *Table) readRows(ctx context.Context) *RowIterator {
conf := &readTableConf{}
t.customizeReadSrc(conf)
return newRowIterator(ctx, t.c.service, conf)
}

func newRowIterator(ctx context.Context, s service, pf pageFetcher) *RowIterator {
it := &RowIterator{
ctx: ctx,
service: s,
pf: pf,
schemaErr: errors.New("called without preceding successful call to Next"),
}
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
it.fetch,
func() int { return len(it.rows) },
func() interface{} { r := it.rows; it.rows = nil; return r })
return it
}

// A RowIterator provides access to the result of a BigQuery lookup.
type RowIterator struct {
ctx context.Context
service service
pf pageFetcher
pageInfo *iterator.PageInfo
nextFunc func() error

// StartIndex can be set before the first call to Next. If PageInfo().PageToken
// is also set, StartIndex is ignored.
StartIndex uint64

rows [][]Value

schema Schema // populated on first call to fech
schemaErr error
}

// Next loads the next row into dst. Its return value is iterator.Done if there
// are no more results. Once Next returns iterator.Done, all subsequent calls
// will return iterator.Done.
func (it *RowIterator) Next(dst ValueLoader) error {
if err := it.nextFunc(); err != nil {
return err
}
row := it.rows[0]
it.rows = it.rows[1:]
return dst.Load(row)
}

// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
func (it *RowIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }

// Schema returns the schema of the result rows.
func (it *RowIterator) Schema() (Schema, error) {
if it.schemaErr != nil {
return nil, fmt.Errorf("Schema %v", it.schemaErr)
}
return it.schema, nil
}

func (it *RowIterator) fetch(pageSize int, pageToken string) (string, error) {
pc := &pagingConf{}
if pageSize > 0 {
pc.recordsPerRequest = int64(pageSize)
pc.setRecordsPerRequest = true
}
if pageToken == "" {
pc.startIndex = it.StartIndex
}
it.pf.setPaging(pc)
res, err := it.pf.fetch(it.ctx, it.service, pageToken)
if err != nil {
it.schemaErr = err
return "", err
}
it.rows = append(it.rows, res.rows...)
it.schema = res.schema
it.schemaErr = nil
return res.pageToken, nil
}
44 changes: 43 additions & 1 deletion bigquery/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"golang.org/x/net/context"
"google.golang.org/api/iterator"
)

type fetchResponse struct {
Expand All @@ -43,6 +44,8 @@ func (pf *pageFetcherStub) fetch(ctx context.Context, s service, token string) (
return call.result, call.err
}

func (pf *pageFetcherStub) setPaging(pc *pagingConf) {}

func TestIterator(t *testing.T) {
fetchFailure := errors.New("fetch failure")

Expand Down Expand Up @@ -359,6 +362,26 @@ func TestIterator(t *testing.T) {
t.Errorf("%s: iterator.Schema:\ngot: %v\nwant: %v", tc.desc, schema, tc.wantSchema)
}
}

for _, tc := range testCases {
if tc.alreadyConsumed != 0 {
continue
}
pf := &pageFetcherStub{
fetchResponses: tc.fetchResponses,
}
it := newRowIterator(context.Background(), nil, pf)
values, schema, err := consumeRowIterator(it)
if err != tc.wantErr {
t.Fatalf("%s: got %v, want %v", tc.desc, err, tc.wantErr)
}
if (len(values) != 0 || len(tc.want) != 0) && !reflect.DeepEqual(values, tc.want) {
t.Errorf("%s: values:\ngot: %v\nwant:%v", tc.desc, values, tc.want)
}
if (len(schema) != 0 || len(tc.wantSchema) != 0) && !reflect.DeepEqual(schema, tc.wantSchema) {
t.Errorf("%s: iterator.Schema:\ngot: %v\nwant: %v", tc.desc, schema, tc.wantSchema)
}
}
}

// consumeIterator reads the schema and all values from an iterator and returns them.
Expand All @@ -376,10 +399,29 @@ func consumeIterator(it *Iterator) ([]ValueList, Schema, error) {
return nil, Schema{}, fmt.Errorf("err calling Schema: %v", err)
}
}

return got, schema, nil
}

// consumeRowIterator reads the schema and all values from a RowIterator and returns them.
func consumeRowIterator(it *RowIterator) ([]ValueList, Schema, error) {
var got []ValueList
var schema Schema
for {
var vals ValueList
err := it.Next(&vals)
if err == iterator.Done {
return got, schema, nil
}
if err != nil {
return got, schema, err
}
got = append(got, vals)
if schema, err = it.Schema(); err != nil {
return nil, Schema{}, err
}
}
}

func TestGetBeforeNext(t *testing.T) {
// TODO: once mashalling/unmarshalling of iterators is implemented, do a similar test for unmarshalled iterators.
pf := &pageFetcherStub{
Expand Down
4 changes: 4 additions & 0 deletions bigquery/read_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (conf *readTableConf) fetch(ctx context.Context, s service, token string) (
return s.readTabledata(ctx, conf, token)
}

func (conf *readTableConf) setPaging(pc *pagingConf) { conf.paging = *pc }

// Read fetches the contents of the table.
func (t *Table) Read(_ context.Context, options ...ReadOption) (*Iterator, error) {
conf := &readTableConf{}
Expand All @@ -55,6 +57,8 @@ func (conf *readQueryConf) fetch(ctx context.Context, s service, token string) (
return s.readQuery(ctx, conf, token)
}

func (conf *readQueryConf) setPaging(pc *pagingConf) { conf.paging = *pc }

// Read fetches the results of a query job.
func (j *Job) Read(_ context.Context, options ...ReadOption) (*Iterator, error) {
conf := &readQueryConf{}
Expand Down

0 comments on commit b8c1696

Please sign in to comment.