Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Avro Schema description copying from bigquery_avro #2269

Merged
merged 8 commits into from
Nov 23, 2017
87 changes: 4 additions & 83 deletions luigi/contrib/bigquery_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
class BigQueryLoadAvro(BigQueryLoadTask):
"""A helper for loading specifically Avro data into BigQuery from GCS.

Additional goodies - takes field documentation from the input data and propagates it
to BigQuery table description and field descriptions. Supports the following Avro schema
types: Primitives, Enums, Records, Arrays, Unions, and Maps. For Map schemas nested maps
and unions are not supported. For Union Schemas only nested Primitive and Record Schemas
are currently supported.
Copies table level description from Avro schema doc, BigQuery internally will copy field-level descriptions
to the table.

Suitable for use via subclassing: override requires() to return Task(s) that output
to GCS Targets; their paths are expected to be URIs of .avro files or URI prefixes
Expand All @@ -41,7 +38,7 @@ def source_uris(self):
return [self._avro_uri(x) for x in flatten(self.input())]

def _get_input_schema(self):
'''Arbitrarily picks an object in input and reads the Avro schema from it.'''
"""Arbitrarily picks an object in input and reads the Avro schema from it."""
assert avro, 'avro module required'

input_target = flatten(self.input())[0]
Expand Down Expand Up @@ -81,84 +78,8 @@ def _set_output_doc(self, avro_schema):
bq_client = self.output().client.client
table = self.output().table

current_bq_schema = bq_client.tables().get(projectId=table.project_id,
datasetId=table.dataset_id,
tableId=table.table_id).execute()

def get_fields_with_description(bq_fields, avro_fields):
new_fields = []
for field in bq_fields:
avro_field = avro_fields[field[u'name']]
field_type = type(avro_field.type)

# Primitive Support
if field_type is avro.schema.PrimitiveSchema:
field[u'description'] = avro_field.doc

# Enum Support
if field_type is avro.schema.EnumSchema:
field[u'description'] = avro_field.type.doc

# Record Support
if field_type is avro.schema.RecordSchema:
field[u'description'] = avro_field.type.doc
field[u'fields'] = get_fields_with_description(field[u'fields'], avro_field.type.fields_dict)

# Array Support
if field_type is avro.schema.ArraySchema:
field[u'description'] = avro_field.type.items.doc
field[u'fields'] = get_fields_with_description(field[u'fields'], avro_field.type.items.fields_dict)

# Union Support
if type(avro_field.type) is avro.schema.UnionSchema:
for schema in avro_field.type.schemas:
if type(schema) is avro.schema.PrimitiveSchema:
field[u'description'] = avro_field.doc

if type(schema) is avro.schema.RecordSchema:
field[u'description'] = schema.doc
field[u'fields'] = get_fields_with_description(field[u'fields'], schema.fields_dict)

# Support for Enums, Arrays, Maps, and Unions inside of a union is not yet implemented

# Map Support
if field_type is avro.schema.MapSchema:
field[u'description'] = avro_field.doc

# Big Query Avro loader creates artificial key and value attributes in the Big Query schema
# ignoring the key and operating directly on the value
# https://cloud.google.com/bigquery/data-formats#avro_format
bq_map_value_field = field[u'fields'][-1]
avro_map_value = avro_field.type.values
value_field_type = type(avro_map_value)

# Primitive Support: Unfortunately the map element doesn't directly have a doc attribute
# so there is no way to get documentation on the primitive types for the value attribute

if value_field_type is avro.schema.EnumSchema:
bq_map_value_field[u'description'] = avro_map_value.type.doc

if value_field_type is avro.schema.RecordSchema:
# Set values description using type's doc
bq_map_value_field[u'description'] = avro_map_value.doc

# This is jumping into the map value directly and working with that
bq_map_value_field[u'fields'] = get_fields_with_description(bq_map_value_field[u'fields'], avro_map_value.fields_dict)

if value_field_type is avro.schema.ArraySchema:
bq_map_value_field[u'description'] = avro_map_value.items.doc
bq_map_value_field[u'fields'] = get_fields_with_description(bq_map_value_field[u'fields'], avro_map_value.items.fields_dict)

# Support for unions and maps nested inside of a map is not yet implemented

new_fields.append(field)
return new_fields

field_descriptions = get_fields_with_description(current_bq_schema['schema']['fields'], avro_schema.fields_dict)

patch = {
'description': avro_schema.doc,
'schema': {'fields': field_descriptions, },
}

bq_client.tables().patch(projectId=table.project_id,
Expand All @@ -174,4 +95,4 @@ def run(self):
try:
self._set_output_doc(self._get_input_schema())
except Exception as e:
logger.warning('Could not propagate Avro doc to BigQuery table field descriptions: %r', e)
logger.warning('Could not propagate Avro doc to BigQuery table description: %r', e)
2 changes: 1 addition & 1 deletion test/contrib/bigquery_gcloud_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def requires(_):
return BigQueryLoadAvroTestInput()

def output(_):
return bigquery.BigQueryTarget(PROJECT_ID, DATASET_ID, self.table_id)
return bigquery.BigQueryTarget(PROJECT_ID, DATASET_ID, self.table_id, location=EU_LOCATION)

task = BigQueryLoadAvroTestTask()
self.assertFalse(task.complete())
Expand Down