Skip to content

Commit

Permalink
Terraform BigQuery Table Hive partitioning support (#3335)
Browse files Browse the repository at this point in the history
* range partitioning for BigQuery is GA

* add hive partitioning options to google_bigquery_table

* improve on formatting of bigquery table hive partitioning options

* correct indenting on  resource_bigquery_table_test.go

* minor fix on the documentation of bigquery table

* align bigquery table test with upstream changes

* gofmt on resource_bigquery_table, resource_bigquery_table_test.go
  • Loading branch information
ffung authored May 29, 2020
1 parent 8888b64 commit 43bf665
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
// <% autogen_exception -%>

package google

import (
"encoding/json"
"errors"
"fmt"
"log"
"regexp"

"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/structure"
Expand Down Expand Up @@ -165,8 +162,8 @@ func resourceBigQueryTable() *schema.Resource {
// Range: [Optional] Range of a sheet to query from. Only used when non-empty.
// Typical format: !:
"range": {
Type: schema.TypeString,
Optional: true,
Type: schema.TypeString,
Optional: true,
AtLeastOneOf: []string{
"external_data_configuration.0.google_sheets_options.0.skip_leading_rows",
"external_data_configuration.0.google_sheets_options.0.range",
Expand All @@ -175,8 +172,8 @@ func resourceBigQueryTable() *schema.Resource {
// SkipLeadingRows: [Optional] The number of rows at the top
// of the sheet that BigQuery will skip when reading the data.
"skip_leading_rows": {
Type: schema.TypeInt,
Optional: true,
Type: schema.TypeInt,
Optional: true,
AtLeastOneOf: []string{
"external_data_configuration.0.google_sheets_options.0.skip_leading_rows",
"external_data_configuration.0.google_sheets_options.0.range",
Expand All @@ -186,6 +183,31 @@ func resourceBigQueryTable() *schema.Resource {
},
},

// HivePartitioningOptions:: [Optional] Options for configuring hive partitioning detect.
"hive_partitioning_options": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// Mode: [Optional] [Experimental] When set, what mode of hive partitioning to use when reading data.
// Two modes are supported.
//* AUTO: automatically infer partition key name(s) and type(s).
//* STRINGS: automatically infer partition key name(s).
"mode": {
Type: schema.TypeString,
Optional: true,
},
// SourceUriPrefix: [Optional] [Experimental] When hive partition detection is requested, a common for all source uris must be required.
// The prefix must end immediately before the partition key encoding begins.
"source_uri_prefix": {
Type: schema.TypeString,
Optional: true,
},
},
},
},

// IgnoreUnknownValues: [Optional] Indicates if BigQuery should
// allow extra values that are not represented in the table schema.
// If true, the extra values are ignored. If false, records with
Expand Down Expand Up @@ -309,11 +331,10 @@ func resourceBigQueryTable() *schema.Resource {
},
},

<% unless version == 'ga' -%>
// RangePartitioning: [Optional] If specified, configures range-based
// partitioning for this table.
"range_partitioning": &schema.Schema{
Type: schema.TypeList,
"range_partitioning": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Expand All @@ -327,8 +348,8 @@ func resourceBigQueryTable() *schema.Resource {
},

// Range: [Required] Information required to partition based on ranges.
"range": &schema.Schema{
Type: schema.TypeList,
"range": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: &schema.Resource{
Expand Down Expand Up @@ -356,11 +377,10 @@ func resourceBigQueryTable() *schema.Resource {
},
},
},
<% end -%>

// Clustering: [Optional] Specifies column names to use for data clustering. Up to four
// top-level columns are allowed, and should be specified in descending priority order.
"clustering": &schema.Schema{
"clustering": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
Expand Down Expand Up @@ -520,7 +540,6 @@ func resourceTable(d *schema.ResourceData, meta interface{}) (*bigquery.Table, e
table.TimePartitioning = expandTimePartitioning(v)
}

<% unless version == 'ga' -%>
if v, ok := d.GetOk("range_partitioning"); ok {
rangePartitioning, err := expandRangePartitioning(v)
if err != nil {
Expand All @@ -529,7 +548,6 @@ func resourceTable(d *schema.ResourceData, meta interface{}) (*bigquery.Table, e

table.RangePartitioning = rangePartitioning
}
<% end -%>

if v, ok := d.GetOk("clustering"); ok {
table.Clustering = &bigquery.Clustering{
Expand Down Expand Up @@ -620,13 +638,11 @@ func resourceBigQueryTableRead(d *schema.ResourceData, meta interface{}) error {
}
}

<% unless version == 'ga' -%>
if res.RangePartitioning != nil {
if err := d.Set("range_partitioning", flattenRangePartitioning(res.RangePartitioning)); err != nil {
return err
}
}
<% end -%>

if res.Clustering != nil {
d.Set("clustering", res.Clustering.Fields)
Expand Down Expand Up @@ -725,6 +741,9 @@ func expandExternalDataConfiguration(cfg interface{}) (*bigquery.ExternalDataCon
if v, ok := raw["google_sheets_options"]; ok {
edc.GoogleSheetsOptions = expandGoogleSheetsOptions(v)
}
if v, ok := raw["hive_partitioning_options"]; ok {
edc.HivePartitioningOptions = expandHivePartitioningOptions(v)
}
if v, ok := raw["ignore_unknown_values"]; ok {
edc.IgnoreUnknownValues = v.(bool)
}
Expand Down Expand Up @@ -757,6 +776,10 @@ func flattenExternalDataConfiguration(edc *bigquery.ExternalDataConfiguration) (
result["google_sheets_options"] = flattenGoogleSheetsOptions(edc.GoogleSheetsOptions)
}

if edc.HivePartitioningOptions != nil {
result["hive_partitioning_options"] = flattenHivePartitioningOptions(edc.HivePartitioningOptions)
}

if edc.IgnoreUnknownValues {
result["ignore_unknown_values"] = edc.IgnoreUnknownValues
}
Expand Down Expand Up @@ -871,6 +894,39 @@ func flattenGoogleSheetsOptions(opts *bigquery.GoogleSheetsOptions) []map[string
return []map[string]interface{}{result}
}

func expandHivePartitioningOptions(configured interface{}) *bigquery.HivePartitioningOptions {
if len(configured.([]interface{})) == 0 {
return nil
}

raw := configured.([]interface{})[0].(map[string]interface{})
opts := &bigquery.HivePartitioningOptions{}

if v, ok := raw["mode"]; ok {
opts.Mode = v.(string)
}

if v, ok := raw["source_uri_prefix"]; ok {
opts.SourceUriPrefix = v.(string)
}

return opts
}

func flattenHivePartitioningOptions(opts *bigquery.HivePartitioningOptions) []map[string]interface{} {
result := map[string]interface{}{}

if opts.Mode != "" {
result["mode"] = opts.Mode
}

if opts.SourceUriPrefix != "" {
result["source_uri_prefix"] = opts.SourceUriPrefix
}

return []map[string]interface{}{result}
}

func expandSchema(raw interface{}) (*bigquery.TableSchema, error) {
var fields []*bigquery.TableFieldSchema

Expand Down Expand Up @@ -913,7 +969,6 @@ func expandTimePartitioning(configured interface{}) *bigquery.TimePartitioning {
return tp
}

<% unless version == 'ga' -%>
func expandRangePartitioning(configured interface{}) (*bigquery.RangePartitioning, error) {
if configured == nil {
return nil, nil
Expand Down Expand Up @@ -945,7 +1000,6 @@ func expandRangePartitioning(configured interface{}) (*bigquery.RangePartitionin

return rp, nil
}
<% end -%>

func flattenEncryptionConfiguration(ec *bigquery.EncryptionConfiguration) []map[string]interface{} {
return []map[string]interface{}{{"kms_key_name": ec.KmsKeyName}}
Expand All @@ -969,7 +1023,6 @@ func flattenTimePartitioning(tp *bigquery.TimePartitioning) []map[string]interfa
return []map[string]interface{}{result}
}

<% unless version == 'ga' -%>
func flattenRangePartitioning(rp *bigquery.RangePartitioning) []map[string]interface{} {
result := map[string]interface{}{
"field": rp.Field,
Expand All @@ -984,7 +1037,6 @@ func flattenRangePartitioning(rp *bigquery.RangePartitioning) []map[string]inter

return []map[string]interface{}{result}
}
<% end -%>

func expandView(configured interface{}) *bigquery.ViewDefinition {
raw := configured.([]interface{})[0].(map[string]interface{})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
<% autogen_exception -%>
package google

import (
Expand Down Expand Up @@ -65,7 +64,30 @@ func TestAccBigQueryTable_Kms(t *testing.T) {
})
}

<% unless version == 'ga' -%>
func TestAccBigQueryTable_HivePartitioning(t *testing.T) {
t.Parallel()
bucketName := testBucketName(t)
resourceName := "google_bigquery_table.test"
datasetID := fmt.Sprintf("tf_test_%s", randString(t, 10))
tableID := fmt.Sprintf("tf_test_%s", randString(t, 10))

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckBigQueryTableDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccBigQueryTableHivePartitioning(bucketName, datasetID, tableID),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccBigQueryTable_RangePartitioning(t *testing.T) {
t.Parallel()
resourceName := "google_bigquery_table.test"
Expand All @@ -88,7 +110,6 @@ func TestAccBigQueryTable_RangePartitioning(t *testing.T) {
},
})
}
<% end -%>

func TestAccBigQueryTable_View(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -357,7 +378,43 @@ EOH
`, datasetID, cryptoKeyName, tableID)
}

<% unless version == 'ga' -%>
func testAccBigQueryTableHivePartitioning(bucketName, datasetID, tableID string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "test" {
name = "%s"
force_destroy = true
}
resource "google_storage_bucket_object" "test" {
name = "key1=20200330/init.csv"
content = ";"
bucket = google_storage_bucket.test.name
}
resource "google_bigquery_dataset" "test" {
dataset_id = "%s"
}
resource "google_bigquery_table" "test" {
table_id = "%s"
dataset_id = google_bigquery_dataset.test.dataset_id
external_data_configuration {
source_format = "CSV"
autodetect = true
source_uris= ["gs://${google_storage_bucket.test.name}/*"]
hive_partitioning_options {
mode = "AUTO"
source_uri_prefix = "gs://${google_storage_bucket.test.name}/"
}
}
depends_on = ["google_storage_bucket_object.test"]
}
`, bucketName, datasetID, tableID)
}

func testAccBigQueryTableRangePartitioning(datasetID, tableID string) string {
return fmt.Sprintf(`
resource "google_bigquery_dataset" "test" {
Expand Down Expand Up @@ -392,7 +449,6 @@ EOH
}
`, datasetID, tableID)
}
<% end -%>

func testAccBigQueryTableWithView(datasetID, tableID string) string {
return fmt.Sprintf(`
Expand Down Expand Up @@ -538,8 +594,6 @@ resource "google_bigquery_table" "test" {
`, datasetID, bucketName, objectName, content, tableID, format, quoteChar)
}



func testAccBigQueryTableFromSheet(context map[string]interface{}) string {
return Nprintf(`
resource "google_bigquery_table" "table" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ The following arguments are supported:
* `time_partitioning` - (Optional) If specified, configures time-based
partitioning for this table. Structure is documented below.

* `range_partitioning` - (Optional, Beta) If specified, configures range-based
* `range_partitioning` - (Optional) If specified, configures range-based
partitioning for this table. Structure is documented below.

* `clustering` - (Optional) Specifies column names to use for data clustering.
Expand All @@ -152,6 +152,11 @@ The `external_data_configuration` block supports:
`source_format` is set to "GOOGLE_SHEETS". Structure is
documented below.

* `hive_partitioning_options` (Optional) - When set, configures hive partitioning
support. Not all storage formats support hive partitioning -- requesting hive
partitioning on an unsupported format will lead to an error, as will providing
an invalid specification.

* `ignore_unknown_values` (Optional) - Indicates if BigQuery should
allow extra values that are not represented in the table schema.
If true, the extra values are ignored. If false, records with
Expand Down Expand Up @@ -207,6 +212,26 @@ The `google_sheets_options` block supports:
that BigQuery will skip when reading the data. At least one of `range` or
`skip_leading_rows` must be set.

The `hive_partitioning_options` block supports:

* `mode` (Optional) - When set, what mode of hive partitioning to use when
reading data. The following modes are supported.
* AUTO: automatically infer partition key name(s) and type(s).
* STRINGS: automatically infer partition key name(s). All types are
Not all storage formats support hive partitioning. Requesting hive
partitioning on an unsupported format will lead to an error.
Currently supported formats are: JSON, CSV, ORC, Avro and Parquet.
* CUSTOM: when set to `CUSTOM`, you must encode the partition key schema within the `source_uri_prefix` by setting `source_uri_prefix` to `gs://bucket/path_to_table/{key1:TYPE1}/{key2:TYPE2}/{key3:TYPE3}`.

* `source_uri_prefix` (Optional) - When hive partition detection is requested,
a common for all source uris must be required. The prefix must end immediately
before the partition key encoding begins. For example, consider files following
this data layout. `gs://bucket/path_to_table/dt=2019-06-01/country=USA/id=7/file.avro`
`gs://bucket/path_to_table/dt=2019-05-31/country=CA/id=3/file.avro` When hive
partitioning is requested with either AUTO or STRINGS detection, the common prefix
can be either of `gs://bucket/path_to_table` or `gs://bucket/path_to_table/`.
Note that when `mode` is set to `CUSTOM`, you must encode the partition key schema within the `source_uri_prefix` by setting `source_uri_prefix` to `gs://bucket/path_to_table/{key1:TYPE1}/{key2:TYPE2}/{key3:TYPE3}`.

The `time_partitioning` block supports:

* `expiration_ms` - (Optional) Number of milliseconds for which to keep the
Expand Down

0 comments on commit 43bf665

Please sign in to comment.