Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Smartsheets: incremental read and tests #12077

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ COPY $CODE_PATH ./$CODE_PATH
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-smartsheets
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: yes
exact_order: yes
extra_records: no
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"aws_s3_sample": {
"modifiedAt": "2222-03-07T11:30:00+00:00"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"gender": { "type": "string" },
"ip_address": { "type": "string" },
"primary_email": { "type": "string" },
"dob": { "type": "string", "format": "date" }
"dob": { "type": "string", "format": "date" },
"modifiedAt": { "type": "string", "format": "date-time" }
}
},
"supported_sync_modes": ["full_refresh"]
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from functools import cached_property
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple

import smartsheet


class SmartSheetAPIWrapper:
def __init__(self, config: Mapping[str, Any]):
self._spreadsheet_id = config["spreadsheet_id"]
self._access_token = config["access_token"]
api_client = smartsheet.Smartsheet(self._access_token)
api_client.errors_as_exceptions(True)
# each call to `Sheets` makes a new instance, so we save it here to make no more new objects
self._get_sheet = api_client.Sheets.get_sheet
self._data = None

def _fetch_sheet(self, from_dt: Optional[str] = None) -> None:
kwargs = {"rows_modified_since": from_dt}
if not from_dt:
kwargs["page_size"] = 1
self._data = self._get_sheet(self._spreadsheet_id, **kwargs)

@staticmethod
def _column_to_property(column_type: str) -> Dict[str, any]:
type_mapping = {
"TEXT_NUMBER": {"type": "string"},
"DATE": {"type": "string", "format": "date"},
"DATETIME": {"type": "string", "format": "date-time"},
}
return type_mapping.get(column_type, {"type": "string"})

def _construct_record(self, row: smartsheet.models.Row) -> Dict[str, str]:
values_column_map = {cell.column_id: str(cell.value or "") for cell in row.cells}
record = {column.title: values_column_map[column.id] for column in self.data.columns}
record["modifiedAt"] = row.modified_at.isoformat()
return record

@property
def data(self) -> smartsheet.models.Row:
if not self._data:
self._fetch_sheet()
return self._data

@property
def name(self) -> str:
return self.data.name

@property
def row_count(self) -> int:
return len(self.data.rows)

@cached_property
def primary_key(self) -> str:
for column in self.data.columns:
if column.primary:
return column.title

@cached_property
def json_schema(self) -> Dict[str, Any]:
column_info = {column.title: self._column_to_property(column.type.value) for column in self.data.columns}
column_info["modifiedAt"] = {"type": "string", "format": "date-time"} # add cursor field explicitly
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": column_info,
}
return json_schema

def read_records(self, from_dt: str) -> Iterable[Dict[str, str]]:
self._fetch_sheet(from_dt)
for row in self.data.rows:
yield self._construct_record(row)

def check_connection(self, logger: logging.Logger) -> Tuple[bool, str]:
try:
_ = self.data
except smartsheet.exceptions.ApiError as e:
err = e.error.result
code = 404 if err.code == 1006 else err.code
reason = f"{err.name}: {code} - {err.message} | Check your spreadsheet ID."
logger.error(reason)
return False, reason
except Exception as e:
reason = str(e)
logger.error(reason)
return False, reason
return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -2,120 +2,21 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, List, Mapping, Tuple

import json
from datetime import datetime
from typing import Dict, Generator, List
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream

import smartsheet
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
Status,
Type,
)
from airbyte_cdk.sources import Source
from .sheet import SmartSheetAPIWrapper
from .streams import SmartsheetStream


def get_prop(col_type: str) -> Dict[str, any]:
props = {
"TEXT_NUMBER": {"type": "string"},
"DATE": {"type": "string", "format": "date"},
"DATETIME": {"type": "string", "format": "date-time"},
}
return props.get(col_type, {"type": "string"})
class SourceSmartsheets(AbstractSource):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
sheet = SmartSheetAPIWrapper(config)
return sheet.check_connection(logger)


def construct_record(sheet_columns: List[Dict], row_cells: List[Dict]) -> Dict:
# convert all data to string as it is only expected format in schema
values_column_map = {cell["columnId"]: str(cell.get("value", "")) for cell in row_cells}
return {column["title"]: values_column_map[column["id"]] for column in sheet_columns}


def get_json_schema(sheet_columns: List[Dict]) -> Dict:
column_info = {column["title"]: get_prop(column["type"]) for column in sheet_columns}
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": column_info,
}
return json_schema


class SourceSmartsheets(Source):
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
try:
access_token = config["access_token"]
spreadsheet_id = config["spreadsheet_id"]

smartsheet_client = smartsheet.Smartsheet(access_token)
smartsheet_client.errors_as_exceptions(True)
smartsheet_client.Sheets.get_sheet(spreadsheet_id)

return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
if isinstance(e, smartsheet.exceptions.ApiError):
err = e.error.result
code = 404 if err.code == 1006 else err.code
reason = f"{err.name}: {code} - {err.message} | Check your spreadsheet ID."
else:
reason = str(e)
logger.error(reason)
return AirbyteConnectionStatus(status=Status.FAILED)

def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
access_token = config["access_token"]
spreadsheet_id = config["spreadsheet_id"]
streams = []

smartsheet_client = smartsheet.Smartsheet(access_token)
try:
sheet = smartsheet_client.Sheets.get_sheet(spreadsheet_id)
sheet = json.loads(str(sheet)) # make it subscriptable
sheet_json_schema = get_json_schema(sheet["columns"])
logger.info(f"Running discovery on sheet: {sheet['name']} with {spreadsheet_id}")

stream = AirbyteStream(name=sheet["name"], json_schema=sheet_json_schema)
stream.supported_sync_modes = ["full_refresh"]
streams.append(stream)

except Exception as e:
raise Exception(f"Could not run discovery: {str(e)}")

return AirbyteCatalog(streams=streams)

def read(
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
) -> Generator[AirbyteMessage, None, None]:

access_token = config["access_token"]
spreadsheet_id = config["spreadsheet_id"]
smartsheet_client = smartsheet.Smartsheet(access_token)

for configured_stream in catalog.streams:
stream = configured_stream.stream
try:
sheet = smartsheet_client.Sheets.get_sheet(spreadsheet_id)
sheet = json.loads(str(sheet)) # make it subscriptable
logger.info(f"Starting syncing spreadsheet {sheet['name']}")
logger.info(f"Row count: {sheet['totalRowCount']}")

for row in sheet["rows"]:
try:
record = construct_record(sheet["columns"], row["cells"])
yield AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream=stream.name, data=record, emitted_at=int(datetime.now().timestamp()) * 1000),
)
except Exception as e:
logger.error(f"Unable to encode row into an AirbyteMessage with the following error: {e}")

except Exception as e:
logger.error(f"Could not read smartsheet: {stream.name}")
raise e
logger.info(f"Finished syncing spreadsheet with ID: {spreadsheet_id}")
def streams(self, config: Mapping[str, Any]) -> List["Stream"]:
sheet = SmartSheetAPIWrapper(config)
return [SmartsheetStream(sheet, config)]
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@
"title": "Sheet ID",
"description": "The spreadsheet ID. Find in the spreadsheet menu: File > Properties",
"type": "string"
},
"start_datetime": {
"title": "Start Date",
"type": ["null", "string"],
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
"examples": [
"2000-01-01",
"2000-01-01 13:00",
"2000-01-01 13:00:00",
"2000-01-01T13:00+00:00",
"2000-01-01T13:00:00-07:00"
],
"description": "ISO 8601, for instance: `YYYY-MM-DD`, `YYYY-MM-DD HH:MM:SS+HH:MM`",
"format": "date-time"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import datetime
from typing import Any, Dict, Iterable, List, Mapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from source_smartsheets.sheet import SmartSheetAPIWrapper


class SmartsheetStream(Stream):
cursor_field = "modifiedAt"

def __init__(self, smartsheet: SmartSheetAPIWrapper, config: Mapping[str, Any]):
self.smartsheet = smartsheet
self._state = {}
self._config = config
self._start_datetime = self._config.get("start_datetime") or "1970-01-01T00:00:00+00:00"

@property
def primary_key(self) -> str:
return self.smartsheet.primary_key

def get_json_schema(self) -> Dict[str, Any]:
return self.smartsheet.json_schema

@property
def name(self) -> str:
return self.smartsheet.name

@property
def state(self) -> Mapping[str, Any]:
if not self._state:
self._state = {self.cursor_field: self._start_datetime}
return self._state

@state.setter
def state(self, value: Mapping[str, Any]):
self._state = value

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
def iso_dt(src):
return datetime.datetime.fromisoformat(src)

for record in self.smartsheet.read_records(self.state[self.cursor_field]):
current_cursor_value = iso_dt(self.state[self.cursor_field])
latest_cursor_value = iso_dt(record[self.cursor_field])
new_cursor_value = max(latest_cursor_value, current_cursor_value)
self.state = {self.cursor_field: new_cursor_value.isoformat("T", "seconds")}
yield record
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
from pathlib import Path
from unittest.mock import Mock

import pytest
from smartsheet.models import Sheet

HERE = Path(__file__).parent.absolute()


@pytest.fixture
def response_mock():
with open(HERE / "response.json") as json_file:
return json.loads(json_file.read())


@pytest.fixture
def config():
return {"spreadsheet_id": "id", "access_token": "token"}


@pytest.fixture
def get_sheet_mocker(mocker, response_mock):
def _mocker(api_wrapper, data=None):
sheet_obj = Sheet(props=response_mock, base_obj=api_wrapper)
get_sheet_mock = Mock(return_value=sheet_obj)
mocker.patch.object(api_wrapper, "_get_sheet", data or get_sheet_mock)
return get_sheet_mock, sheet_obj

return _mocker
Loading