Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAT: cursor_paths should support custom nested and absolute paths #4552

Merged
merged 11 commits into from
Jul 5, 2021
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Changelog

## 0.1.5
## 0.1.8
Fix cursor_path to support nested and absolute paths: https://github.com/airbytehq/airbyte/pull/4552

## 0.1.7
Add: `test_spec` additionally checks if Dockerfile has `ENV AIRBYTE_ENTRYPOINT` defined and equal to space_joined `ENTRYPOINT`

## 0.1.6
Add test whether PKs present and not None if `source_defined_primary_key` defined: https://github.com/airbytehq/airbyte/pull/4140

## 0.1.5
Expand Down
6 changes: 3 additions & 3 deletions airbyte-integrations/bases/source-acceptance-test/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
FROM python:3.7-slim

RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
ENV CODE_PATH="source_acceptance_test"

WORKDIR /airbyte/source_acceptance_test
COPY $CODE_PATH ./$CODE_PATH
COPY source_acceptance_test ./source_acceptance_test
COPY setup.py ./
COPY pytest.ini ./
RUN pip install .

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,14 @@ def records_with_state(records, state, stream_mapping, state_cursor_paths) -> It
stream_name = record.record.stream
stream = stream_mapping[stream_name]
helper = JsonSchemaHelper(schema=stream.stream.json_schema)
record_value = helper.get_cursor_value(record=record.record.data, cursor_path=stream.cursor_field)
state_value = helper.get_state_value(state=state[stream_name], cursor_path=state_cursor_paths[stream_name])
cursor_field = helper.field(stream.cursor_field)
record_value = cursor_field.parse(record=record.record.data)
try:
# first attempt to parse the state value assuming the state object is namespaced on stream names
state_value = cursor_field.parse(record=state[stream_name], path=state_cursor_paths[stream_name])
keu marked this conversation as resolved.
Show resolved Hide resolved
except KeyError:
# try second time as an absolute path in state file (i.e. bookmarks -> stream_name -> column -> value)
state_value = cursor_field.parse(record=state, path=state_cursor_paths[stream_name])
yield record_value, state_value


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,54 +24,62 @@


from functools import reduce
from typing import Any, List, Set
from typing import Any, List, Mapping, Optional, Set

import pendulum


class CatalogField:
"""Field class to represent cursor/pk fields"""

def __init__(self, schema: Mapping[str, Any], path: List[str]):
self.schema = schema
self.path = path
self.formats = self._detect_formats()

def _detect_formats(self) -> Set[str]:
"""Extract set of formats/types for this field"""
format_ = []
try:
format_ = self.schema.get("format", self.schema["type"])
if not isinstance(format_, List):
format_ = [format_]
except KeyError:
pass
return set(format_)

def _parse_value(self, value: Any) -> Any:
"""Do actual parsing of the serialized value"""
if self.formats.intersection({"datetime", "date-time", "date"}):
if value is None and "null" not in self.formats:
raise ValueError(f"Invalid field format. Value: {value}. Format: {self.formats}")
return pendulum.parse(value)
return value

def parse(self, record: Mapping[str, Any], path: Optional[List[str]] = None) -> Any:
"""Extract field value from the record and cast it to native type"""
path = path or self.path
value = reduce(lambda data, key: data[key], path, record)
return self._parse_value(value)


class JsonSchemaHelper:
def __init__(self, schema):
self._schema = schema

def get_ref(self, path):
def get_ref(self, path: List[str]):
node = self._schema
for segment in path.split("/")[1:]:
node = node[segment]
return node

def get_property(self, path: List[str]):
def get_property(self, path: List[str]) -> Mapping[str, Any]:
node = self._schema
for segment in path:
if "$ref" in node:
node = self.get_ref(node["$ref"])
node = node["properties"][segment]
return node

def get_format_for_key_path(self, path: List[str]) -> Set[str]:
format_ = []
try:
field = self.get_property(path)
format_ = field.get("format", field["type"])
if not isinstance(format_, List):
format_ = [format_]
except KeyError:
pass
return set(format_)

def get_cursor_value(self, record, cursor_path):
type_ = self.get_format_for_key_path(path=cursor_path)
value = reduce(lambda data, key: data[key], cursor_path, record)
return self.parse_value(value, type_)

@staticmethod
def parse_value(value: Any, format_: Set[str]):
if format_.intersection({"datetime", "date-time", "date"}):
if value is None and "null" not in format_:
raise ValueError(f"Invalid field format. Value: {value}. Format: {format_}")
return pendulum.parse(value)
return value

def get_state_value(self, state, cursor_path):
format_ = self.get_format_for_key_path(path=cursor_path)
value = state[cursor_path[-1]]
return self.parse_value(value, format_)
def field(self, path: List[str]) -> CatalogField:
return CatalogField(schema=self.get_property(path), path=path)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import pendulum
import pytest
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
Type,
)
from source_acceptance_test.tests.test_incremental import records_with_state


@pytest.fixture(name="simple_state")
def simple_state_fixture():
return {
"my_stream": {
"id": 11,
"ts_created": "2014-01-01T22:03:11",
"ts_updated": "2015-01-01T22:03:11",
}
}


@pytest.fixture(name="nested_state")
def nested_state_fixture(simple_state):
return {"my_stream": {"some_account_id": simple_state["my_stream"]}}


@pytest.fixture(name="singer_state")
def singer_state_fixture(simple_state):
return {"bookmarks": simple_state}


@pytest.fixture(name="stream_schema")
def stream_schema_fixture():
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"ts_created": {"type": "string", "format": "datetime"},
"nested": {"type": "object", "properties": {"ts_updated": {"type": "string", "format": "date"}}},
},
}


@pytest.fixture(name="stream_mapping")
def stream_mapping_fixture(stream_schema):
return {
"my_stream": ConfiguredAirbyteStream(
stream=AirbyteStream(name="my_stream", json_schema=stream_schema),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append,
)
}


@pytest.fixture(name="records")
def records_fixture():
return [
AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream="my_stream",
data={"id": 1, "ts_created": "2015-11-01T22:03:11", "nested": {"ts_updated": "2015-05-01"}},
emitted_at=0,
),
)
]


def test_simple_path(records, stream_mapping, simple_state):
stream_mapping["my_stream"].cursor_field = ["id"]
paths = {"my_stream": ["id"]}

result = records_with_state(records=records, state=simple_state, stream_mapping=stream_mapping, state_cursor_paths=paths)
record_value, state_value = next(result)

assert record_value == 1, "record value must be correctly found"
assert state_value == 11, "state value must be correctly found"


def test_nested_path(records, stream_mapping, nested_state):
stream_mapping["my_stream"].cursor_field = ["nested", "ts_updated"]
paths = {"my_stream": ["some_account_id", "ts_updated"]}

result = records_with_state(records=records, state=nested_state, stream_mapping=stream_mapping, state_cursor_paths=paths)
record_value, state_value = next(result)

assert record_value == pendulum.datetime(2015, 5, 1), "record value must be correctly found"
assert state_value == pendulum.datetime(2015, 1, 1, 22, 3, 11), "state value must be correctly found"


def test_nested_path_unknown(records, stream_mapping, simple_state):
stream_mapping["my_stream"].cursor_field = ["ts_created"]
paths = {"my_stream": ["unknown", "ts_created"]}

result = records_with_state(records=records, state=simple_state, stream_mapping=stream_mapping, state_cursor_paths=paths)
with pytest.raises(KeyError):
next(result)


def test_absolute_path(records, stream_mapping, singer_state):
stream_mapping["my_stream"].cursor_field = ["ts_created"]
paths = {"my_stream": ["bookmarks", "my_stream", "ts_created"]}

result = records_with_state(records=records, state=singer_state, stream_mapping=stream_mapping, state_cursor_paths=paths)
record_value, state_value = next(result)

assert record_value == pendulum.datetime(2015, 11, 1, 22, 3, 11), "record value must be correctly found"
assert state_value == pendulum.datetime(2014, 1, 1, 22, 3, 11), "state value must be correctly found"