Skip to content

Commit

Permalink
bigquery: Make Query more configurable and add a Run method.
Browse files Browse the repository at this point in the history
Change-Id: I05384196ef43489b3fd8d960e913bcb8c27bee44
Reviewed-on: https://code-review.googlesource.com/8154
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
mcgreevy committed Oct 13, 2016
1 parent 07f82cd commit 5d33b1b
Show file tree
Hide file tree
Showing 13 changed files with 715 additions and 291 deletions.
33 changes: 0 additions & 33 deletions bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,39 +85,6 @@ func (c *Client) Close() error {
return nil
}

// Query creates a query with string q. You may optionally set
// DefaultProjectID and DefaultDatasetID on the returned query before using it.
func (c *Client) Query(q string) *Query {
return &Query{Q: q, client: c}
}

// Read submits a query for execution and returns the results via an Iterator.
//
// Read uses a temporary table to hold the results of the query job.
//
// For more control over how a query is performed, don't use this method but
// instead pass the Query as a Source to Client.Copy, and call Read on the
// resulting Job.
func (q *Query) Read(ctx context.Context, options ...ReadOption) (*Iterator, error) {
dest := &Table{}
job, err := q.client.Copy(ctx, dest, q, WriteTruncate)
if err != nil {
return nil, err
}
return job.Read(ctx, options...)
}

// executeQuery submits a query for execution and returns the results via an Iterator.
func (c *Client) executeQuery(ctx context.Context, q *Query, options ...ReadOption) (*Iterator, error) {
dest := &Table{}
job, err := c.Copy(ctx, dest, q, WriteTruncate)
if err != nil {
return nil, err
}

return c.Read(ctx, job, options...)
}

// Dataset creates a handle to a BigQuery dataset in the client's project.
func (c *Client) Dataset(id string) *Dataset {
return c.DatasetInProject(c.projectID, id)
Expand Down
7 changes: 1 addition & 6 deletions bigquery/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,7 @@ func (c *Copier) Run(ctx context.Context) (*Job, error) {
},
}

if c.JobID != "" {
job.JobReference = &bq.JobReference{
JobId: c.JobID,
ProjectId: c.c.projectID,
}
}
setJobRef(job, c.JobID, c.c.projectID)

conf.DestinationTable = c.Dst.tableRefProto()
for _, t := range c.Srcs {
Expand Down
7 changes: 1 addition & 6 deletions bigquery/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,7 @@ func (e *Extractor) Run(ctx context.Context) (*Job, error) {
},
}

if e.JobID != "" {
job.JobReference = &bq.JobReference{
JobId: e.JobID,
ProjectId: e.c.projectID,
}
}
setJobRef(job, e.JobID, e.c.projectID)

e.Dst.customizeExtractDst(job.Configuration.Extract)
e.Src.customizeExtractSrc(job.Configuration.Extract)
Expand Down
9 changes: 4 additions & 5 deletions bigquery/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,10 @@ func TestIntegration(t *testing.T) {
checkRead(table)

// Query the table.
q := &Query{
Q: "select name, num from t1",
DefaultProjectID: projID,
DefaultDatasetID: ds.id,
}
q := c.Query("select name, num from t1")
q.DefaultProjectID = projID
q.DefaultDatasetID = ds.id

checkRead(q)

// Query the long way.
Expand Down
15 changes: 15 additions & 0 deletions bigquery/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ type JobStatus struct {
Errors []*Error
}

// setJobRef initializes job's JobReference if given a non-empty jobID.
// projectID must be non-empty.
func setJobRef(job *bq.Job, jobID, projectID string) {
if jobID == "" {
return
}
// We don't check whether projectID is empty; the server will return an
// error when it encounters the resulting JobReference.

job.JobReference = &bq.JobReference{
JobId: jobID,
ProjectId: projectID,
}
}

// jobOption is an Option which modifies a bq.Job proto.
// This is used for configuring values that apply to all operations, such as setting a jobReference.
type jobOption interface {
Expand Down
223 changes: 220 additions & 3 deletions bigquery/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,30 @@ func (c *Client) Read(ctx context.Context, src ReadSource, options ...ReadOption
case *Job:
return src.Read(ctx, options...)
case *Query:
// For compatibility, support Query values created by literal, rather
// than Client.Query.
// Query used not to contain a QueryConfig. By moving its
// top-level fields down into a QueryConfig field, we break
// code that uses a Query literal. If users make the minimal
// change to fix this (e.g. moving the "Q" field into a nested
// QueryConfig within the Query), they will end up with a Query
// that has no Client. It's preferable to make Read continue
// to work in this case too, at least until we delete Read
// completely. So we copy QueryConfig into a Query with an
// actual client.
if src.client == nil {
src.client = c
src = &Query{
client: c,
QueryConfig: src.QueryConfig,
Q: src.Q,
DefaultProjectID: src.DefaultProjectID,
DefaultDatasetID: src.DefaultDatasetID,
}
}
return src.Read(ctx, options...)
case *QueryConfig:
// For compatibility, support QueryConfig values created by literal, rather
// than Client.Query.
q := &Query{client: c, QueryConfig: *src}
return q.Read(ctx, options...)
case *Table:
return src.Read(ctx, options...)
}
Expand Down Expand Up @@ -194,14 +212,42 @@ func (opt ignoreUnknownValues) customizeLoad(conf *bq.JobConfigurationLoad) {
conf.IgnoreUnknownValues = true
}

// CreateDisposition returns an Option that specifies the TableCreateDisposition to use.
// Deprecated: use the CreateDisposition field in Query, CopyConfig or LoadConfig instead.
func CreateDisposition(disp TableCreateDisposition) Option { return disp }

func (opt TableCreateDisposition) implementsOption() {}

func (opt TableCreateDisposition) customizeCopy(conf *bq.JobConfigurationTableCopy) {
conf.CreateDisposition = string(opt)
}

func (opt TableCreateDisposition) customizeLoad(conf *bq.JobConfigurationLoad) {
conf.CreateDisposition = string(opt)
}

func (opt TableCreateDisposition) customizeQuery(conf *bq.JobConfigurationQuery) {
conf.CreateDisposition = string(opt)
}

// WriteDisposition returns an Option that specifies the TableWriteDisposition to use.
// Deprecated: use the WriteDisposition field in Query, CopyConfig or LoadConfig instead.
func WriteDisposition(disp TableWriteDisposition) Option { return disp }

func (opt TableWriteDisposition) implementsOption() {}

func (opt TableWriteDisposition) customizeCopy(conf *bq.JobConfigurationTableCopy) {
conf.WriteDisposition = string(opt)
}

func (opt TableWriteDisposition) customizeLoad(conf *bq.JobConfigurationLoad) {
conf.WriteDisposition = string(opt)
}

func (opt TableWriteDisposition) customizeQuery(conf *bq.JobConfigurationQuery) {
conf.WriteDisposition = string(opt)
}

type extractOption interface {
customizeExtract(conf *bq.JobConfigurationExtract)
}
Expand Down Expand Up @@ -299,6 +345,9 @@ func (c *Client) Copy(ctx context.Context, dst Destination, src Source, options
return c.cp(ctx, dst, src, options)
case *Query:
return c.query(ctx, dst, src, options)
case *QueryConfig:
q := &Query{QueryConfig: *src}
return c.query(ctx, dst, q, options)
}
case *GCSReference:
if src, ok := src.(*Table); ok {
Expand All @@ -322,3 +371,171 @@ type Destination interface {
type ReadSource interface {
implementsReadSource()
}

type queryOption interface {
customizeQuery(conf *bq.JobConfigurationQuery)
}

// DisableQueryCache returns an Option that prevents results being fetched from the query cache.
// If this Option is not used, results are fetched from the cache if they are available.
// The query cache is a best-effort cache that is flushed whenever tables in the query are modified.
// Cached results are only available when TableID is unspecified in the query's destination Table.
// For more information, see https://cloud.google.com/bigquery/querying-data#querycaching
//
// Deprecated: use Query.DisableQueryCache instead.
func DisableQueryCache() Option { return disableQueryCache{} }

type disableQueryCache struct{}

func (opt disableQueryCache) implementsOption() {}

func (opt disableQueryCache) customizeQuery(conf *bq.JobConfigurationQuery) {
f := false
conf.UseQueryCache = &f
}

// DisableFlattenedResults returns an Option that prevents results being flattened.
// If this Option is not used, results from nested and repeated fields are flattened.
// DisableFlattenedResults implies AllowLargeResults
// For more information, see https://cloud.google.com/bigquery/docs/data#nested
// Deprecated: use Query.DisableFlattenedResults instead.
func DisableFlattenedResults() Option { return disableFlattenedResults{} }

type disableFlattenedResults struct{}

func (opt disableFlattenedResults) implementsOption() {}

func (opt disableFlattenedResults) customizeQuery(conf *bq.JobConfigurationQuery) {
f := false
conf.FlattenResults = &f
// DisableFlattenedResults implies AllowLargeResults
allowLargeResults{}.customizeQuery(conf)
}

// AllowLargeResults returns an Option that allows the query to produce arbitrarily large result tables.
// The destination must be a table.
// When using this option, queries will take longer to execute, even if the result set is small.
// For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults
// Deprecated: use Query.AllowLargeResults instead.
func AllowLargeResults() Option { return allowLargeResults{} }

type allowLargeResults struct{}

func (opt allowLargeResults) implementsOption() {}

func (opt allowLargeResults) customizeQuery(conf *bq.JobConfigurationQuery) {
conf.AllowLargeResults = true
}

// JobPriority returns an Option that causes a query to be scheduled with the specified priority.
// The default priority is InteractivePriority.
// For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries
// Deprecated: use Query.Priority instead.
func JobPriority(priority string) Option { return jobPriority(priority) }

type jobPriority string

func (opt jobPriority) implementsOption() {}

func (opt jobPriority) customizeQuery(conf *bq.JobConfigurationQuery) {
conf.Priority = string(opt)
}

// MaxBillingTier returns an Option that sets the maximum billing tier for a Query.
// Queries that have resource usage beyond this tier will fail (without
// incurring a charge). If this Option is not used, the project default will be used.
// Deprecated: use Query.MaxBillingTier instead.
func MaxBillingTier(tier int) Option { return maxBillingTier(tier) }

type maxBillingTier int

func (opt maxBillingTier) implementsOption() {}

func (opt maxBillingTier) customizeQuery(conf *bq.JobConfigurationQuery) {
tier := int64(opt)
conf.MaximumBillingTier = &tier
}

// MaxBytesBilled returns an Option that limits the number of bytes billed for
// this job. Queries that would exceed this limit will fail (without incurring
// a charge).
// If this Option is not used, or bytes is < 1, the project default will be
// used.
// Deprecated: use Query.MaxBytesBilled instead.
func MaxBytesBilled(bytes int64) Option { return maxBytesBilled(bytes) }

type maxBytesBilled int64

func (opt maxBytesBilled) implementsOption() {}

func (opt maxBytesBilled) customizeQuery(conf *bq.JobConfigurationQuery) {
if opt >= 1 {
conf.MaximumBytesBilled = int64(opt)
}
}

// QueryUseStandardSQL returns an Option that set the query to use standard SQL.
// The default setting is false (using legacy SQL).
// Deprecated: use Query.UseStandardSQL instead.
func QueryUseStandardSQL() Option { return queryUseStandardSQL{} }

type queryUseStandardSQL struct{}

func (opt queryUseStandardSQL) implementsOption() {}

func (opt queryUseStandardSQL) customizeQuery(conf *bq.JobConfigurationQuery) {
conf.UseLegacySql = false
conf.ForceSendFields = append(conf.ForceSendFields, "UseLegacySql")
}

func (c *Client) query(ctx context.Context, dst *Table, src *Query, options []Option) (*Job, error) {
job, options := initJobProto(c.projectID, options)
payload := &bq.JobConfigurationQuery{}

dst.customizeQueryDst(payload)

// QueryConfig now contains a Dst field. If it is set, it will override dst.
// This should not affect existing client code which does not set QueryConfig.Dst.
src.QueryConfig.customizeQuerySrc(payload)

// For compatability, allow some legacy fields to be set directly on the query.
// TODO(jba): delete this code when deleting Client.Copy.
if src.Q != "" {
payload.Query = src.Q
}
if src.DefaultProjectID != "" || src.DefaultDatasetID != "" {
payload.DefaultDataset = &bq.DatasetReference{
DatasetId: src.DefaultDatasetID,
ProjectId: src.DefaultProjectID,
}
}
// end of compatability code.

for _, opt := range options {
o, ok := opt.(queryOption)
if !ok {
return nil, fmt.Errorf("option (%#v) not applicable to dst/src pair: dst: %T ; src: %T", opt, dst, src)
}
o.customizeQuery(payload)
}

job.Configuration = &bq.JobConfiguration{
Query: payload,
}
j, err := c.service.insertJob(ctx, job, c.projectID)
if err != nil {
return nil, err
}
j.isQuery = true
return j, nil
}

// Read submits a query for execution and returns the results via an Iterator.
// Deprecated: Call Read on the Job returned by Query.Run instead.
func (q *Query) Read(ctx context.Context, options ...ReadOption) (*Iterator, error) {
job, err := q.Run(ctx)
if err != nil {
return nil, err
}
return job.Read(ctx, options...)
}
8 changes: 6 additions & 2 deletions bigquery/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,18 @@ func TestConfiguringLoader(t *testing.T) {
want := defaultLoadJob()
want.Configuration.Load.CreateDisposition = "CREATE_NEVER"
want.Configuration.Load.WriteDisposition = "WRITE_TRUNCATE"
want.JobReference = &bq.JobReference{
JobId: "ajob",
ProjectId: "project-id",
}

loader := dst.LoaderFrom(src)
loader.CreateDisposition = CreateNever
loader.WriteDisposition = WriteTruncate
loader.JobID = "ajob"

if _, err := loader.Run(context.Background()); err != nil {
t.Errorf("err calling Loader.Run: %v", err)
return
t.Fatalf("err calling Loader.Run: %v", err)
}
if !reflect.DeepEqual(s.Job, want) {
t.Errorf("loading: got:\n%v\nwant:\n%v", s.Job, want)
Expand Down
Loading

0 comments on commit 5d33b1b

Please sign in to comment.