Skip to content

Commit

Permalink
feat(bigquery): export HivePartitioningOptions in load job configurat…
Browse files Browse the repository at this point in the history
…ions (#3877)
  • Loading branch information
shollyman authored Apr 2, 2021
1 parent bc3bf1c commit 7c759be
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 19 deletions.
34 changes: 15 additions & 19 deletions bigquery/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,17 @@ type ExternalDataConfig struct {

func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
q := bq.ExternalDataConfiguration{
SourceFormat: string(e.SourceFormat),
SourceUris: e.SourceURIs,
Autodetect: e.AutoDetect,
Compression: string(e.Compression),
IgnoreUnknownValues: e.IgnoreUnknownValues,
MaxBadRecords: e.MaxBadRecords,
SourceFormat: string(e.SourceFormat),
SourceUris: e.SourceURIs,
Autodetect: e.AutoDetect,
Compression: string(e.Compression),
IgnoreUnknownValues: e.IgnoreUnknownValues,
MaxBadRecords: e.MaxBadRecords,
HivePartitioningOptions: e.HivePartitioningOptions.toBQ(),
}
if e.Schema != nil {
q.Schema = e.Schema.toBQ()
}
if e.HivePartitioningOptions != nil {
q.HivePartitioningOptions = e.HivePartitioningOptions.toBQ()
}
if e.Options != nil {
e.Options.populateExternalDataConfig(&q)
}
Expand All @@ -121,13 +119,14 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {

func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfig, error) {
e := &ExternalDataConfig{
SourceFormat: DataFormat(q.SourceFormat),
SourceURIs: q.SourceUris,
AutoDetect: q.Autodetect,
Compression: Compression(q.Compression),
IgnoreUnknownValues: q.IgnoreUnknownValues,
MaxBadRecords: q.MaxBadRecords,
Schema: bqToSchema(q.Schema),
SourceFormat: DataFormat(q.SourceFormat),
SourceURIs: q.SourceUris,
AutoDetect: q.Autodetect,
Compression: Compression(q.Compression),
IgnoreUnknownValues: q.IgnoreUnknownValues,
MaxBadRecords: q.MaxBadRecords,
Schema: bqToSchema(q.Schema),
HivePartitioningOptions: bqToHivePartitioningOptions(q.HivePartitioningOptions),
}
switch {
case q.CsvOptions != nil:
Expand All @@ -141,9 +140,6 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi
return nil, err
}
}
if q.HivePartitioningOptions != nil {
e.HivePartitioningOptions = bqToHivePartitioningOptions(q.HivePartitioningOptions)
}
return e, nil
}

Expand Down
6 changes: 6 additions & 0 deletions bigquery/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type LoadConfig struct {
// For ingestion from datastore backups, ProjectionFields governs which fields
// are projected from the backup. The default behavior projects all fields.
ProjectionFields []string

// HivePartitioningOptions allows use of Hive partitioning based on the
// layout of objects in Cloud Storage.
HivePartitioningOptions *HivePartitioningOptions
}

func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
Expand All @@ -81,6 +85,7 @@ func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
SchemaUpdateOptions: l.SchemaUpdateOptions,
UseAvroLogicalTypes: l.UseAvroLogicalTypes,
ProjectionFields: l.ProjectionFields,
HivePartitioningOptions: l.HivePartitioningOptions.toBQ(),
},
}
media := l.Src.populateLoadConfig(config.Load)
Expand All @@ -100,6 +105,7 @@ func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig {
SchemaUpdateOptions: q.Load.SchemaUpdateOptions,
UseAvroLogicalTypes: q.Load.UseAvroLogicalTypes,
ProjectionFields: q.Load.ProjectionFields,
HivePartitioningOptions: bqToHivePartitioningOptions(q.Load.HivePartitioningOptions),
}
var fc *FileConfig
if len(q.Load.SourceUris) == 0 {
Expand Down
25 changes: 25 additions & 0 deletions bigquery/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,31 @@ func TestLoad(t *testing.T) {
return j
}(),
},
{
dst: c.Dataset("dataset-id").Table("table-id"),
src: func() *GCSReference {
g := NewGCSReference("uri")
g.SourceFormat = Parquet
return g
}(),
config: LoadConfig{
HivePartitioningOptions: &HivePartitioningOptions{
Mode: CustomHivePartitioningMode,
SourceURIPrefix: "source_uri",
RequirePartitionFilter: true,
},
},
want: func() *bq.Job {
j := defaultLoadJob()
j.Configuration.Load.SourceFormat = "PARQUET"
j.Configuration.Load.HivePartitioningOptions = &bq.HivePartitioningOptions{
Mode: "CUSTOM",
RequirePartitionFilter: true,
SourceUriPrefix: "source_uri",
}
return j
}(),
},
}

for i, tc := range testCases {
Expand Down

0 comments on commit 7c759be

Please sign in to comment.