Skip to content

Commit

Permalink
Source Smartsheets: incremental read and tests (#12077)
Browse files Browse the repository at this point in the history
* #5520 fix scrambled columns bug

* #5520 source smartsheets: add changelog item

* #5520 move pytest to optional setup requirements

* #12003 source smartsheets: implement incremental read + tests

* #12003 source smartsheet: add changelog

* #12003 source smartsheets: fix merge conflict on unit tests

* #12003 source smartsheets: fix startdate in spec

* #12003 source smartsheets: add default start dt to spec

* #12003 source smartsheets: add default start dt to spec

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Apr 27, 2022
1 parent d612b8a commit 2eb9356
Show file tree
Hide file tree
Showing 17 changed files with 741 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@
- name: Smartsheets
sourceDefinitionId: 374ebc65-6636-4ea0-925c-7d35999a8ffc
dockerRepository: airbyte/source-smartsheets
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/sources/smartsheets
icon: smartsheet.svg
sourceType: api
Expand Down
11 changes: 10 additions & 1 deletion airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7856,7 +7856,7 @@
oauthFlowOutputParameters:
- - "access_token"
- - "refresh_token"
- dockerImage: "airbyte/source-smartsheets:0.1.9"
- dockerImage: "airbyte/source-smartsheets:0.1.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/smartsheets"
connectionSpecification:
Expand All @@ -7878,6 +7878,15 @@
title: "Sheet ID"
description: "The spreadsheet ID. Find in the spreadsheet menu: File > Properties"
type: "string"
start_datetime:
title: "Start Datetime"
type: "string"
examples:
- "2000-01-01T13:00:00"
- "2000-01-01T13:00:00-07:00"
description: "ISO 8601, for instance: `YYYY-MM-DDTHH:MM:SS`, `YYYY-MM-DDTHH:MM:SS+HH:MM`"
format: "date-time"
default: "2020-01-01T00:00:00+00:00"
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ COPY $CODE_PATH ./$CODE_PATH
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-smartsheets
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: yes
exact_order: yes
extra_records: no
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"aws_s3_sample": {
"modifiedAt": "2222-03-07T11:30:00+00:00"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"gender": { "type": "string" },
"ip_address": { "type": "string" },
"primary_email": { "type": "string" },
"dob": { "type": "string", "format": "date" }
"dob": { "type": "string", "format": "date" },
"modifiedAt": { "type": "string", "format": "date-time" }
}
},
"supported_sync_modes": ["full_refresh"]
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from functools import cached_property
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple

import smartsheet


class SmartSheetAPIWrapper:
def __init__(self, config: Mapping[str, Any]):
self._spreadsheet_id = config["spreadsheet_id"]
self._access_token = config["access_token"]
api_client = smartsheet.Smartsheet(self._access_token)
api_client.errors_as_exceptions(True)
# each call to `Sheets` makes a new instance, so we save it here to make no more new objects
self._get_sheet = api_client.Sheets.get_sheet
self._data = None

def _fetch_sheet(self, from_dt: Optional[str] = None) -> None:
kwargs = {"rows_modified_since": from_dt}
if not from_dt:
kwargs["page_size"] = 1
self._data = self._get_sheet(self._spreadsheet_id, **kwargs)

@staticmethod
def _column_to_property(column_type: str) -> Dict[str, any]:
type_mapping = {
"TEXT_NUMBER": {"type": "string"},
"DATE": {"type": "string", "format": "date"},
"DATETIME": {"type": "string", "format": "date-time"},
}
return type_mapping.get(column_type, {"type": "string"})

def _construct_record(self, row: smartsheet.models.Row) -> Dict[str, str]:
values_column_map = {cell.column_id: str(cell.value or "") for cell in row.cells}
record = {column.title: values_column_map[column.id] for column in self.data.columns}
record["modifiedAt"] = row.modified_at.isoformat()
return record

@property
def data(self) -> smartsheet.models.Row:
if not self._data:
self._fetch_sheet()
return self._data

@property
def name(self) -> str:
return self.data.name

@property
def row_count(self) -> int:
return len(self.data.rows)

@cached_property
def primary_key(self) -> str:
for column in self.data.columns:
if column.primary:
return column.title

@cached_property
def json_schema(self) -> Dict[str, Any]:
column_info = {column.title: self._column_to_property(column.type.value) for column in self.data.columns}
column_info["modifiedAt"] = {"type": "string", "format": "date-time"} # add cursor field explicitly
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": column_info,
}
return json_schema

def read_records(self, from_dt: str) -> Iterable[Dict[str, str]]:
self._fetch_sheet(from_dt)
for row in self.data.rows:
yield self._construct_record(row)

def check_connection(self, logger: logging.Logger) -> Tuple[bool, Optional[str]]:
try:
_ = self.data
except smartsheet.exceptions.ApiError as e:
err = e.error.result
code = 404 if err.code == 1006 else err.code
reason = f"{err.name}: {code} - {err.message} | Check your spreadsheet ID."
logger.error(reason)
return False, reason
except Exception as e:
reason = str(e)
logger.error(reason)
return False, reason
return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -2,120 +2,21 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, List, Mapping, Tuple

import json
from datetime import datetime
from typing import Dict, Generator, List
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream

import smartsheet
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
Status,
Type,
)
from airbyte_cdk.sources import Source
from .sheet import SmartSheetAPIWrapper
from .streams import SmartsheetStream


def get_prop(col_type: str) -> Dict[str, any]:
props = {
"TEXT_NUMBER": {"type": "string"},
"DATE": {"type": "string", "format": "date"},
"DATETIME": {"type": "string", "format": "date-time"},
}
return props.get(col_type, {"type": "string"})
class SourceSmartsheets(AbstractSource):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
sheet = SmartSheetAPIWrapper(config)
return sheet.check_connection(logger)


def construct_record(sheet_columns: List[Dict], row_cells: List[Dict]) -> Dict:
# convert all data to string as it is only expected format in schema
values_column_map = {cell["columnId"]: str(cell.get("value", "")) for cell in row_cells}
return {column["title"]: values_column_map[column["id"]] for column in sheet_columns}


def get_json_schema(sheet_columns: List[Dict]) -> Dict:
column_info = {column["title"]: get_prop(column["type"]) for column in sheet_columns}
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": column_info,
}
return json_schema


class SourceSmartsheets(Source):
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
try:
access_token = config["access_token"]
spreadsheet_id = config["spreadsheet_id"]

smartsheet_client = smartsheet.Smartsheet(access_token)
smartsheet_client.errors_as_exceptions(True)
smartsheet_client.Sheets.get_sheet(spreadsheet_id)

return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
if isinstance(e, smartsheet.exceptions.ApiError):
err = e.error.result
code = 404 if err.code == 1006 else err.code
reason = f"{err.name}: {code} - {err.message} | Check your spreadsheet ID."
else:
reason = str(e)
logger.error(reason)
return AirbyteConnectionStatus(status=Status.FAILED)

def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
access_token = config["access_token"]
spreadsheet_id = config["spreadsheet_id"]
streams = []

smartsheet_client = smartsheet.Smartsheet(access_token)
try:
sheet = smartsheet_client.Sheets.get_sheet(spreadsheet_id)
sheet = json.loads(str(sheet)) # make it subscriptable
sheet_json_schema = get_json_schema(sheet["columns"])
logger.info(f"Running discovery on sheet: {sheet['name']} with {spreadsheet_id}")

stream = AirbyteStream(name=sheet["name"], json_schema=sheet_json_schema)
stream.supported_sync_modes = ["full_refresh"]
streams.append(stream)

except Exception as e:
raise Exception(f"Could not run discovery: {str(e)}")

return AirbyteCatalog(streams=streams)

def read(
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
) -> Generator[AirbyteMessage, None, None]:

access_token = config["access_token"]
spreadsheet_id = config["spreadsheet_id"]
smartsheet_client = smartsheet.Smartsheet(access_token)

for configured_stream in catalog.streams:
stream = configured_stream.stream
try:
sheet = smartsheet_client.Sheets.get_sheet(spreadsheet_id)
sheet = json.loads(str(sheet)) # make it subscriptable
logger.info(f"Starting syncing spreadsheet {sheet['name']}")
logger.info(f"Row count: {sheet['totalRowCount']}")

for row in sheet["rows"]:
try:
record = construct_record(sheet["columns"], row["cells"])
yield AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream=stream.name, data=record, emitted_at=int(datetime.now().timestamp()) * 1000),
)
except Exception as e:
logger.error(f"Unable to encode row into an AirbyteMessage with the following error: {e}")

except Exception as e:
logger.error(f"Could not read smartsheet: {stream.name}")
raise e
logger.info(f"Finished syncing spreadsheet with ID: {spreadsheet_id}")
def streams(self, config: Mapping[str, Any]) -> List["Stream"]:
sheet = SmartSheetAPIWrapper(config)
return [SmartsheetStream(sheet, config)]
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
"title": "Sheet ID",
"description": "The spreadsheet ID. Find in the spreadsheet menu: File > Properties",
"type": "string"
},
"start_datetime": {
"title": "Start Datetime",
"type": "string",
"examples": ["2000-01-01T13:00:00", "2000-01-01T13:00:00-07:00"],
"description": "ISO 8601, for instance: `YYYY-MM-DDTHH:MM:SS`, `YYYY-MM-DDTHH:MM:SS+HH:MM`",
"format": "date-time",
"default": "2020-01-01T00:00:00+00:00"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import datetime
from typing import Any, Dict, Iterable, List, Mapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from source_smartsheets.sheet import SmartSheetAPIWrapper


class SmartsheetStream(Stream):
cursor_field = "modifiedAt"

def __init__(self, smartsheet: SmartSheetAPIWrapper, config: Mapping[str, Any]):
self.smartsheet = smartsheet
self._state = {}
self._config = config
self._start_datetime = self._config.get("start_datetime") or "2020-01-01T00:00:00+00:00"

@property
def primary_key(self) -> str:
return self.smartsheet.primary_key

def get_json_schema(self) -> Dict[str, Any]:
return self.smartsheet.json_schema

@property
def name(self) -> str:
return self.smartsheet.name

@property
def state(self) -> Mapping[str, Any]:
if not self._state:
self._state = {self.cursor_field: self._start_datetime}
return self._state

@state.setter
def state(self, value: Mapping[str, Any]):
self._state = value

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
def iso_dt(src):
return datetime.datetime.fromisoformat(src)

for record in self.smartsheet.read_records(self.state[self.cursor_field]):
current_cursor_value = iso_dt(self.state[self.cursor_field])
latest_cursor_value = iso_dt(record[self.cursor_field])
new_cursor_value = max(latest_cursor_value, current_cursor_value)
self.state = {self.cursor_field: new_cursor_value.isoformat("T", "seconds")}
yield record
Loading

0 comments on commit 2eb9356

Please sign in to comment.