From 33c20b817cdc6e34233777467c3865bc4c9ae30e Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Tue, 20 Apr 2021 22:20:21 +0530 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Google=20sheets=20bugfix:=20hand?= =?UTF-8?q?le=20duplicate=20sheet=20headers=20(#2905)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../google_sheets_source.py | 27 +++++++++++++- .../google_sheets_source/helpers.py | 32 ++++++++++++----- .../source-google-sheets/main_dev.py | 2 +- .../sample_files/configured_catalog.json | 25 +++++++++++++ .../sample_files/sample_config.json | 6 ++++ .../unit_tests/test_helpers.py | 36 ++++++++++++++++--- 6 files changed, 112 insertions(+), 16 deletions(-) create mode 100644 airbyte-integrations/connectors/source-google-sheets/sample_files/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-google-sheets/sample_files/sample_config.json diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py index e34a80bd6272..a6d97a63cf47 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py @@ -63,6 +63,31 @@ def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: status=Status.FAILED, message=f"Unable to connect with the provided credentials to spreadsheet. Error: {reason}" ) + # Check for duplicate headers + spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False)) + sheet_names = [sheet.properties.title for sheet in spreadsheet_metadata.sheets] + duplicate_headers_in_sheet = {} + for sheet_name in sheet_names: + try: + header_row_data = Helpers.get_first_row(client, spreadsheet_id, sheet_name) + _, duplicate_headers = Helpers.get_valid_headers_and_duplicates(header_row_data) + if duplicate_headers: + duplicate_headers_in_sheet[sheet_name] = duplicate_headers + except Exception as err: + logger.error(str(err)) + return AirbyteConnectionStatus( + status=Status.FAILED, + message=f"Unable to read the schema of sheet {sheet_name}. Error: {str(err)}" + ) + if duplicate_headers_in_sheet: + duplicate_headers_error_message = ", ".join([f"[sheet:{sheet_name}, headers:{duplicate_sheet_headers}]" + for sheet_name,duplicate_sheet_headers in duplicate_headers_in_sheet.items()]) + return AirbyteConnectionStatus( + status=Status.FAILED, + message="The following duplicate headers were found in the following sheets. Please fix them to continue: " + + duplicate_headers_error_message + ) + return AirbyteConnectionStatus(status=Status.SUCCEEDED) def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: @@ -76,7 +101,7 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: for sheet_name in sheet_names: try: header_row_data = Helpers.get_first_row(client, spreadsheet_id, sheet_name) - stream = Helpers.headers_to_airbyte_stream(sheet_name, header_row_data) + stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_row_data) streams.append(stream) except Exception as err: logger.error(str(err)) diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py index cf0aeaa5e8cc..dbfd7c449bd7 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py @@ -26,6 +26,7 @@ from datetime import datetime from typing import Dict, FrozenSet, Iterable, List +from base_python import AirbyteLogger from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog from google.oauth2 import service_account from googleapiclient import discovery @@ -51,31 +52,44 @@ def get_authenticated_google_credentials(credentials: Dict[str, str], scopes: Li return service_account.Credentials.from_service_account_info(credentials, scopes=scopes) @staticmethod - def headers_to_airbyte_stream(sheet_name: str, header_row_values: List[str]) -> AirbyteStream: + def headers_to_airbyte_stream(logger: AirbyteLogger, sheet_name: str, header_row_values: List[str]) -> AirbyteStream: """ Parses sheet headers from the provided row. This method assumes that data is contiguous i.e: every cell contains a value and the first cell which does not contain a value denotes the end of the headers. For example, if the first row contains "One | Two | | Three" then this method will parse the headers as ["One", "Two"]. This assumption is made for simplicity and can be modified later. """ + fields, duplicate_fields = Helpers.get_valid_headers_and_duplicates(header_row_values) + if duplicate_fields: + logger.warn(f"Duplicate headers found in {sheet_name}. Ignoring them :{duplicate_fields}") + + sheet_json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + # For simplicity, the type of every cell is a string + "properties": {field: {"type": "string"} for field in fields}, + } + + return AirbyteStream(name=sheet_name, json_schema=sheet_json_schema) + + @staticmethod + def get_valid_headers_and_duplicates(header_row_values: List[str]) -> (List[str], List[str]): fields = [] + duplicate_fields = set() for cell_value in header_row_values: if cell_value: if cell_value in fields: - raise Exception(f"Duplicate header {cell_value} found in {sheet_name}. Please ensure all headers are unique") + duplicate_fields.add(cell_value) else: fields.append(cell_value) else: break - sheet_json_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - # For simplicity, the type of every cell is a string - "properties": {field: {"type": "string"} for field in fields}, - } + # Removing all duplicate fields + if duplicate_fields: + fields = [field for field in fields if field not in duplicate_fields] - return AirbyteStream(name=sheet_name, json_schema=sheet_json_schema) + return fields, list(duplicate_fields) @staticmethod def get_formatted_row_values(row_data: RowData) -> List[str]: diff --git a/airbyte-integrations/connectors/source-google-sheets/main_dev.py b/airbyte-integrations/connectors/source-google-sheets/main_dev.py index 63e59cc05120..91b1679806f1 100644 --- a/airbyte-integrations/connectors/source-google-sheets/main_dev.py +++ b/airbyte-integrations/connectors/source-google-sheets/main_dev.py @@ -26,7 +26,7 @@ from base_python.entrypoint import launch -from .google_sheets_source import GoogleSheetsSource +from google_sheets_source import GoogleSheetsSource if __name__ == "__main__": source = GoogleSheetsSource() diff --git a/airbyte-integrations/connectors/source-google-sheets/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-google-sheets/sample_files/configured_catalog.json new file mode 100644 index 000000000000..e7d36b0d37d3 --- /dev/null +++ b/airbyte-integrations/connectors/source-google-sheets/sample_files/configured_catalog.json @@ -0,0 +1,25 @@ +{ + "streams": [ + { + "stream" : { + "name": "Sheet1", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "ID": { + "type": "string" + }, + "Name": { + "type": "string" + } + } + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-google-sheets/sample_files/sample_config.json b/airbyte-integrations/connectors/source-google-sheets/sample_files/sample_config.json new file mode 100644 index 000000000000..0587ca9f7fca --- /dev/null +++ b/airbyte-integrations/connectors/source-google-sheets/sample_files/sample_config.json @@ -0,0 +1,6 @@ +{ + "spreadsheet_id" : "randomid", + "credentials_json" : "{\"type\": \"service_account\",\"project_id\": \"airbyte-310409\",\"private_key_id\": \"xyz\",\"private_key\": \"-----BEGIN PRIVATE KEY-----\\n ... -----END PRIVATE KEY-----\\n\",\"client_email\": \"airbyte@airbyte-123456.iam.gserviceaccount.com\",\"client_id\": \"121512124\",\"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\"token_uri\": \"https://oauth2.googleapis.com/token\",\"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/airbyte%40airbyte-123456.iam.gserviceaccount.com\"}" +} + + diff --git a/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py b/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py index 9443590bb39f..e0fe98fc0ce6 100644 --- a/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py +++ b/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py @@ -25,12 +25,14 @@ import unittest from unittest.mock import Mock, patch +from base_python import AirbyteLogger from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode from airbyte_protocol.models.airbyte_protocol import DestinationSyncMode from google_sheets_source.client import GoogleSheetsClient from google_sheets_source.helpers import Helpers from google_sheets_source.models import CellData, GridData, RowData, Sheet, SheetProperties, Spreadsheet +logger = AirbyteLogger() class TestHelpers(unittest.TestCase): def test_headers_to_airbyte_stream(self): @@ -47,14 +49,38 @@ def test_headers_to_airbyte_stream(self): }, ) - actual_stream = Helpers.headers_to_airbyte_stream(sheet_name, header_values) + actual_stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_values) self.assertEqual(expected_stream, actual_stream) - def test_duplicate_headers_to_ab_stream_fails(self): + def test_duplicate_headers_retrived(self): + header_values = ["h1", "h1", "h3"] + + expected_valid_header_values = ["h3"] + expected_duplicate_header_values = ["h1"] + + actual_header_values, actual_duplicate_header_values = Helpers.get_valid_headers_and_duplicates(header_values) + + self.assertEqual(expected_duplicate_header_values, actual_duplicate_header_values) + self.assertEqual(expected_valid_header_values, actual_header_values) + + def test_duplicate_headers_to_ab_stream_ignores_duplicates(self): sheet_name = "sheet1" header_values = ["h1", "h1", "h3"] - with self.assertRaises(BaseException): - Helpers.headers_to_airbyte_stream(sheet_name, header_values) + + # h1 is ignored because it is duplicate + expected_stream_header_values = ["h3"] + expected_stream = AirbyteStream( + name=sheet_name, + json_schema={ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + # For simplicity, the type of every cell is a string + "properties": {header: {"type": "string"} for header in expected_stream_header_values}, + }, + ) + + actual_stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_values) + self.assertEqual(expected_stream, actual_stream) def test_headers_to_airbyte_stream_blank_values_terminate_row(self): sheet_name = "sheet1" @@ -69,7 +95,7 @@ def test_headers_to_airbyte_stream_blank_values_terminate_row(self): "properties": {"h1": {"type": "string"}}, }, ) - actual_stream = Helpers.headers_to_airbyte_stream(sheet_name, header_values) + actual_stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_values) self.assertEqual(expected_stream, actual_stream)