Skip to content

Commit

Permalink
🎉 Google sheets bugfix: handle duplicate sheet headers (#2905)
Browse files Browse the repository at this point in the history
  • Loading branch information
makalaaneesh authored Apr 20, 2021
1 parent 8f8be40 commit 33c20b8
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
status=Status.FAILED, message=f"Unable to connect with the provided credentials to spreadsheet. Error: {reason}"
)

# Check for duplicate headers
spreadsheet_metadata = Spreadsheet.parse_obj(client.get(spreadsheetId=spreadsheet_id, includeGridData=False))
sheet_names = [sheet.properties.title for sheet in spreadsheet_metadata.sheets]
duplicate_headers_in_sheet = {}
for sheet_name in sheet_names:
try:
header_row_data = Helpers.get_first_row(client, spreadsheet_id, sheet_name)
_, duplicate_headers = Helpers.get_valid_headers_and_duplicates(header_row_data)
if duplicate_headers:
duplicate_headers_in_sheet[sheet_name] = duplicate_headers
except Exception as err:
logger.error(str(err))
return AirbyteConnectionStatus(
status=Status.FAILED,
message=f"Unable to read the schema of sheet {sheet_name}. Error: {str(err)}"
)
if duplicate_headers_in_sheet:
duplicate_headers_error_message = ", ".join([f"[sheet:{sheet_name}, headers:{duplicate_sheet_headers}]"
for sheet_name,duplicate_sheet_headers in duplicate_headers_in_sheet.items()])
return AirbyteConnectionStatus(
status=Status.FAILED,
message="The following duplicate headers were found in the following sheets. Please fix them to continue: "
+ duplicate_headers_error_message
)

return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
Expand All @@ -76,7 +101,7 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
for sheet_name in sheet_names:
try:
header_row_data = Helpers.get_first_row(client, spreadsheet_id, sheet_name)
stream = Helpers.headers_to_airbyte_stream(sheet_name, header_row_data)
stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_row_data)
streams.append(stream)
except Exception as err:
logger.error(str(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from datetime import datetime
from typing import Dict, FrozenSet, Iterable, List

from base_python import AirbyteLogger
from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog
from google.oauth2 import service_account
from googleapiclient import discovery
Expand All @@ -51,31 +52,44 @@ def get_authenticated_google_credentials(credentials: Dict[str, str], scopes: Li
return service_account.Credentials.from_service_account_info(credentials, scopes=scopes)

@staticmethod
def headers_to_airbyte_stream(sheet_name: str, header_row_values: List[str]) -> AirbyteStream:
def headers_to_airbyte_stream(logger: AirbyteLogger, sheet_name: str, header_row_values: List[str]) -> AirbyteStream:
"""
Parses sheet headers from the provided row. This method assumes that data is contiguous
i.e: every cell contains a value and the first cell which does not contain a value denotes the end
of the headers. For example, if the first row contains "One | Two | | Three" then this method
will parse the headers as ["One", "Two"]. This assumption is made for simplicity and can be modified later.
"""
fields, duplicate_fields = Helpers.get_valid_headers_and_duplicates(header_row_values)
if duplicate_fields:
logger.warn(f"Duplicate headers found in {sheet_name}. Ignoring them :{duplicate_fields}")

sheet_json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
# For simplicity, the type of every cell is a string
"properties": {field: {"type": "string"} for field in fields},
}

return AirbyteStream(name=sheet_name, json_schema=sheet_json_schema)

@staticmethod
def get_valid_headers_and_duplicates(header_row_values: List[str]) -> (List[str], List[str]):
fields = []
duplicate_fields = set()
for cell_value in header_row_values:
if cell_value:
if cell_value in fields:
raise Exception(f"Duplicate header {cell_value} found in {sheet_name}. Please ensure all headers are unique")
duplicate_fields.add(cell_value)
else:
fields.append(cell_value)
else:
break

sheet_json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
# For simplicity, the type of every cell is a string
"properties": {field: {"type": "string"} for field in fields},
}
# Removing all duplicate fields
if duplicate_fields:
fields = [field for field in fields if field not in duplicate_fields]

return AirbyteStream(name=sheet_name, json_schema=sheet_json_schema)
return fields, list(duplicate_fields)

@staticmethod
def get_formatted_row_values(row_data: RowData) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from base_python.entrypoint import launch

from .google_sheets_source import GoogleSheetsSource
from google_sheets_source import GoogleSheetsSource

if __name__ == "__main__":
source = GoogleSheetsSource()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"streams": [
{
"stream" : {
"name": "Sheet1",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"ID": {
"type": "string"
},
"Name": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"spreadsheet_id" : "randomid",
"credentials_json" : "{\"type\": \"service_account\",\"project_id\": \"airbyte-310409\",\"private_key_id\": \"xyz\",\"private_key\": \"-----BEGIN PRIVATE KEY-----\\n ... -----END PRIVATE KEY-----\\n\",\"client_email\": \"airbyte@airbyte-123456.iam.gserviceaccount.com\",\"client_id\": \"121512124\",\"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\"token_uri\": \"https://oauth2.googleapis.com/token\",\"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/airbyte%40airbyte-123456.iam.gserviceaccount.com\"}"
}


Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import unittest
from unittest.mock import Mock, patch

from base_python import AirbyteLogger
from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode
from airbyte_protocol.models.airbyte_protocol import DestinationSyncMode
from google_sheets_source.client import GoogleSheetsClient
from google_sheets_source.helpers import Helpers
from google_sheets_source.models import CellData, GridData, RowData, Sheet, SheetProperties, Spreadsheet

logger = AirbyteLogger()

class TestHelpers(unittest.TestCase):
def test_headers_to_airbyte_stream(self):
Expand All @@ -47,14 +49,38 @@ def test_headers_to_airbyte_stream(self):
},
)

actual_stream = Helpers.headers_to_airbyte_stream(sheet_name, header_values)
actual_stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_values)
self.assertEqual(expected_stream, actual_stream)

def test_duplicate_headers_to_ab_stream_fails(self):
def test_duplicate_headers_retrived(self):
header_values = ["h1", "h1", "h3"]

expected_valid_header_values = ["h3"]
expected_duplicate_header_values = ["h1"]

actual_header_values, actual_duplicate_header_values = Helpers.get_valid_headers_and_duplicates(header_values)

self.assertEqual(expected_duplicate_header_values, actual_duplicate_header_values)
self.assertEqual(expected_valid_header_values, actual_header_values)

def test_duplicate_headers_to_ab_stream_ignores_duplicates(self):
sheet_name = "sheet1"
header_values = ["h1", "h1", "h3"]
with self.assertRaises(BaseException):
Helpers.headers_to_airbyte_stream(sheet_name, header_values)

# h1 is ignored because it is duplicate
expected_stream_header_values = ["h3"]
expected_stream = AirbyteStream(
name=sheet_name,
json_schema={
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
# For simplicity, the type of every cell is a string
"properties": {header: {"type": "string"} for header in expected_stream_header_values},
},
)

actual_stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_values)
self.assertEqual(expected_stream, actual_stream)

def test_headers_to_airbyte_stream_blank_values_terminate_row(self):
sheet_name = "sheet1"
Expand All @@ -69,7 +95,7 @@ def test_headers_to_airbyte_stream_blank_values_terminate_row(self):
"properties": {"h1": {"type": "string"}},
},
)
actual_stream = Helpers.headers_to_airbyte_stream(sheet_name, header_values)
actual_stream = Helpers.headers_to_airbyte_stream(logger, sheet_name, header_values)

self.assertEqual(expected_stream, actual_stream)

Expand Down

0 comments on commit 33c20b8

Please sign in to comment.