diff --git a/luigi/contrib/bigquery_avro.py b/luigi/contrib/bigquery_avro.py index 975d7d9b04..043c8c3a34 100644 --- a/luigi/contrib/bigquery_avro.py +++ b/luigi/contrib/bigquery_avro.py @@ -19,11 +19,8 @@ class BigQueryLoadAvro(BigQueryLoadTask): """A helper for loading specifically Avro data into BigQuery from GCS. - Additional goodies - takes field documentation from the input data and propagates it - to BigQuery table description and field descriptions. Supports the following Avro schema - types: Primitives, Enums, Records, Arrays, Unions, and Maps. For Map schemas nested maps - and unions are not supported. For Union Schemas only nested Primitive and Record Schemas - are currently supported. + Copies table level description from Avro schema doc, BigQuery internally will copy field-level descriptions + to the table. Suitable for use via subclassing: override requires() to return Task(s) that output to GCS Targets; their paths are expected to be URIs of .avro files or URI prefixes @@ -41,7 +38,7 @@ def source_uris(self): return [self._avro_uri(x) for x in flatten(self.input())] def _get_input_schema(self): - '''Arbitrarily picks an object in input and reads the Avro schema from it.''' + """Arbitrarily picks an object in input and reads the Avro schema from it.""" assert avro, 'avro module required' input_target = flatten(self.input())[0] @@ -81,84 +78,8 @@ def _set_output_doc(self, avro_schema): bq_client = self.output().client.client table = self.output().table - current_bq_schema = bq_client.tables().get(projectId=table.project_id, - datasetId=table.dataset_id, - tableId=table.table_id).execute() - - def get_fields_with_description(bq_fields, avro_fields): - new_fields = [] - for field in bq_fields: - avro_field = avro_fields[field[u'name']] - field_type = type(avro_field.type) - - # Primitive Support - if field_type is avro.schema.PrimitiveSchema: - field[u'description'] = avro_field.doc - - # Enum Support - if field_type is avro.schema.EnumSchema: - field[u'description'] = avro_field.type.doc - - # Record Support - if field_type is avro.schema.RecordSchema: - field[u'description'] = avro_field.type.doc - field[u'fields'] = get_fields_with_description(field[u'fields'], avro_field.type.fields_dict) - - # Array Support - if field_type is avro.schema.ArraySchema: - field[u'description'] = avro_field.type.items.doc - field[u'fields'] = get_fields_with_description(field[u'fields'], avro_field.type.items.fields_dict) - - # Union Support - if type(avro_field.type) is avro.schema.UnionSchema: - for schema in avro_field.type.schemas: - if type(schema) is avro.schema.PrimitiveSchema: - field[u'description'] = avro_field.doc - - if type(schema) is avro.schema.RecordSchema: - field[u'description'] = schema.doc - field[u'fields'] = get_fields_with_description(field[u'fields'], schema.fields_dict) - - # Support for Enums, Arrays, Maps, and Unions inside of a union is not yet implemented - - # Map Support - if field_type is avro.schema.MapSchema: - field[u'description'] = avro_field.doc - - # Big Query Avro loader creates artificial key and value attributes in the Big Query schema - # ignoring the key and operating directly on the value - # https://cloud.google.com/bigquery/data-formats#avro_format - bq_map_value_field = field[u'fields'][-1] - avro_map_value = avro_field.type.values - value_field_type = type(avro_map_value) - - # Primitive Support: Unfortunately the map element doesn't directly have a doc attribute - # so there is no way to get documentation on the primitive types for the value attribute - - if value_field_type is avro.schema.EnumSchema: - bq_map_value_field[u'description'] = avro_map_value.type.doc - - if value_field_type is avro.schema.RecordSchema: - # Set values description using type's doc - bq_map_value_field[u'description'] = avro_map_value.doc - - # This is jumping into the map value directly and working with that - bq_map_value_field[u'fields'] = get_fields_with_description(bq_map_value_field[u'fields'], avro_map_value.fields_dict) - - if value_field_type is avro.schema.ArraySchema: - bq_map_value_field[u'description'] = avro_map_value.items.doc - bq_map_value_field[u'fields'] = get_fields_with_description(bq_map_value_field[u'fields'], avro_map_value.items.fields_dict) - - # Support for unions and maps nested inside of a map is not yet implemented - - new_fields.append(field) - return new_fields - - field_descriptions = get_fields_with_description(current_bq_schema['schema']['fields'], avro_schema.fields_dict) - patch = { 'description': avro_schema.doc, - 'schema': {'fields': field_descriptions, }, } bq_client.tables().patch(projectId=table.project_id, @@ -174,4 +95,4 @@ def run(self): try: self._set_output_doc(self._get_input_schema()) except Exception as e: - logger.warning('Could not propagate Avro doc to BigQuery table field descriptions: %r', e) + logger.warning('Could not propagate Avro doc to BigQuery table description: %r', e) diff --git a/test/contrib/bigquery_gcloud_test.py b/test/contrib/bigquery_gcloud_test.py index a5be9d4cf4..75bc9bb8b8 100644 --- a/test/contrib/bigquery_gcloud_test.py +++ b/test/contrib/bigquery_gcloud_test.py @@ -533,7 +533,7 @@ def requires(_): return BigQueryLoadAvroTestInput() def output(_): - return bigquery.BigQueryTarget(PROJECT_ID, DATASET_ID, self.table_id) + return bigquery.BigQueryTarget(PROJECT_ID, DATASET_ID, self.table_id, location=EU_LOCATION) task = BigQueryLoadAvroTestTask() self.assertFalse(task.complete())