Skip to content

Commit

Permalink
🐛 SingerSource: Fix incompatibilities and typing issues (#14148)
Browse files Browse the repository at this point in the history
* Use logging.Logger in SingerSource

* Fix SingerSource ConfigContainer

This fixes typing issues with `ConfigContainer` and makes it compatible
with `split_config`. Fixes #8710.

* Fix SingerSource state and catalog typer issues

* Rename SingerSource method args to match parent classes

* Remove old comment about excluding Singer

Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
  • Loading branch information
cstruct and girarda authored Jul 1, 2022
1 parent e23789b commit be2d1a8
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 103 deletions.
38 changes: 28 additions & 10 deletions airbyte-cdk/python/airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Mapping, Optional
from typing import Any, Mapping, Optional, Generic, TypeVar, Protocol

import yaml
from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification
Expand All @@ -33,27 +33,27 @@ def __init__(self, spec_string):
self.spec_string = spec_string


class Connector(ABC):
TConfig = TypeVar("TConfig", bound=Mapping[str, Any])


class BaseConnector(ABC, Generic[TConfig]):
# configure whether the `check_config_against_spec_or_exit()` needs to be called
check_config_against_spec: bool = True

# can be overridden to change an input config
def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
@abstractmethod
def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
"""
Persist config in temporary directory to run the Source job
"""
config_path = os.path.join(temp_dir, "config.json")
self.write_config(config, config_path)
return config

@staticmethod
def read_config(config_path: str) -> Mapping[str, Any]:
def read_config(config_path: str) -> TConfig:
with open(config_path, "r") as file:
contents = file.read()
return json.loads(contents)

@staticmethod
def write_config(config: Mapping[str, Any], config_path: str):
def write_config(config: TConfig, config_path: str):
with open(config_path, "w") as fh:
fh.write(json.dumps(config))

Expand Down Expand Up @@ -81,8 +81,26 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
return ConnectorSpecification.parse_obj(spec_obj)

@abstractmethod
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.
"""


class _WriteConfigProtocol(Protocol):
@staticmethod
def write_config(config: Mapping[str, Any], config_path: str):
...


class DefaultConnectorMixin:
# can be overridden to change an input config
def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
config_path = os.path.join(temp_dir, "config.json")
self.write_config(config, config_path)
return config


class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC):
...
6 changes: 2 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:

# Remove internal flags from config before validating so
# jsonschema's additionalProperties flag wont fail the validation
config, internal_config = split_config(config)
connector_config, _ = split_config(config)
if self.source.check_config_against_spec or cmd == "check":
check_config_against_spec_or_exit(config, source_spec)
# Put internal flags back to config dict
config.update(internal_config.dict())
check_config_against_spec_or_exit(connector_config, source_spec)

if cmd == "check":
check_result = self.source.check(self.logger, config)
Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/singer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
#

from .singer_helpers import SingerHelper, SyncModeInfo
from .source import SingerSource
from .source import ConfigContainer, SingerSource

__all__ = ["SingerSource", "SyncModeInfo", "SingerHelper"]
__all__ = ["ConfigContainer", "SingerSource", "SyncModeInfo", "SingerHelper"]
94 changes: 31 additions & 63 deletions airbyte-cdk/python/airbyte_cdk/sources/singer/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,34 @@


import os
from dataclasses import dataclass
import logging
from typing import Any, Dict, Iterable, List, Mapping, Type

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.source import BaseSource
from airbyte_cdk.sources.utils.catalog_helpers import CatalogHelper

from .singer_helpers import Catalogs, SingerHelper, SyncModeInfo


@dataclass
class ConfigContainer:
config: Mapping[str, Any]
class ConfigContainer(Dict[str, Any]):
config_path: str

def __init__(self, config, config_path):
super().__init__(config)
self.config_path = config_path

class SingerSource(Source):

# can be overridden to change an input config
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Return type "ConfigContainer" of "configure" incompatible with return type "Mapping[str, Any]" in supertype "Connector"'
# need to fix, ignored for now
def configure(self, raw_config: Mapping[str, Any], temp_dir: str) -> ConfigContainer: # type: ignore
class SingerSource(BaseSource[ConfigContainer, str, str]):
def configure(self, config: Mapping[str, Any], temp_dir: str) -> ConfigContainer:
"""
Persist raw_config in temporary directory to run the Source job
This can be overridden if extra temporary files need to be persisted in the temp dir
"""
config = self.transform_config(raw_config)
config_path = os.path.join(temp_dir, "config.json")
config = ConfigContainer(self.transform_config(config), config_path)
self.write_config(config, config_path)
return ConfigContainer(config, config_path)
return config

# Can be overridden to change an input config
def transform_config(self, config: Mapping[str, Any]) -> Mapping[str, Any]:
Expand All @@ -44,94 +40,70 @@ def transform_config(self, config: Mapping[str, Any]) -> Mapping[str, Any]:
"""
return config

# Overriding to change an input catalog as path instead
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Return type "str" of "read_catalog" incompatible with return type "ConfiguredAirbyteCatalog" in supertype "Source"'
# need to fix, ignored for now
def read_catalog(self, catalog_path: str) -> str: # type: ignore
def read_catalog(self, catalog_path: str) -> str:
"""
Since singer source don't need actual catalog object, we override this to return path only
"""
return catalog_path

# Overriding to change an input state as path instead
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Return type "str" of "read_state" incompatible with return type "Dict[str, Any]" in supertype "Source"'
# need to fix, ignored for now
def read_state(self, state_path: str) -> str: # type: ignore
def read_state(self, state_path: str) -> str:
"""
Since singer source don't need actual state object, we override this to return path only
"""
return state_path

def check_config(self, logger: AirbyteLogger, config_path: str, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check_config(self, logger: logging.Logger, config_path: str, config: ConfigContainer) -> AirbyteConnectionStatus:
"""
Some Singer source may perform check using config_path or config to
tests if the input configuration can be used to successfully connect to the integration
"""
raise NotImplementedError

def discover_cmd(self, logger: AirbyteLogger, config_path: str) -> str:
def discover_cmd(self, logger: logging.Logger, config_path: str) -> str:
"""
Returns the command used to run discovery in the singer tap. For example, if the bash command used to invoke the singer tap is `tap-postgres`,
and the config JSON lived in "/path/config.json", this method would return "tap-postgres --config /path/config.json"
"""
raise NotImplementedError

def read_cmd(self, logger: AirbyteLogger, config_path: str, catalog_path: str, state_path: str = None) -> str:
def read_cmd(self, logger: logging.Logger, config_path: str, catalog_path: str, state_path: str = None) -> str:
"""
Returns the command used to read data from the singer tap. For example, if the bash command used to invoke the singer tap is `tap-postgres`,
and the config JSON lived in "/path/config.json", and the catalog was in "/path/catalog.json",
this method would return "tap-postgres --config /path/config.json --catalog /path/catalog.json"
"""
raise NotImplementedError

def _discover_internal(self, logger: AirbyteLogger, config_path: str) -> Catalogs:
def _discover_internal(self, logger: logging.Logger, config_path: str) -> Catalogs:
cmd = self.discover_cmd(logger, config_path)
catalogs = SingerHelper.get_catalogs(
logger, cmd, self.get_sync_mode_overrides(), self.get_primary_key_overrides(), self.get_excluded_streams()
)
return catalogs

# according to issue CDK: typing errors #9500, mypy raises errors on this line
# 'Argument 1 of "check" is incompatible with supertype "Connector"; supertype defines the argument type as "Logger"'
# 'Argument 2 of "check" is incompatible with supertype "Connector"; supertype defines the argument type as "Mapping[str, Any]"'
# need to fix, ignored for now
def check(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteConnectionStatus: # type: ignore
def check(self, logger: logging.Logger, config: ConfigContainer) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the integration
"""
return self.check_config(logger, config_container.config_path, config_container.config)
return self.check_config(logger, config.config_path, config)

# according to issue CDK: typing errors #9500, mypy raises errors on this line
# 'Argument 1 of "discover" is incompatible with supertype "Source"; supertype defines the argument type as "Logger"'
# need to fix, ignored for now
def discover(self, logger: AirbyteLogger, config_container) -> AirbyteCatalog: # type: ignore
def discover(self, logger: logging.Logger, config: ConfigContainer) -> AirbyteCatalog:
"""
Implements the parent class discover method.
"""
if isinstance(config_container, ConfigContainer):
return self._discover_internal(logger, config_container.config_path).airbyte_catalog
else:
return self._discover_internal(logger, config_container).airbyte_catalog

# according to issue CDK: typing errors #9500, mypy raises errors on this line
# 'Argument 1 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "Logger"'
# 'Argument 2 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "Mapping[str, Any]"'
# 'Argument 3 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "ConfiguredAirbyteCatalog"'
# 'Argument 4 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "Optional[MutableMapping[str, Any]]"'
# need to fix, ignored for now
def read( # type: ignore
self, logger: AirbyteLogger, config_container: ConfigContainer, catalog_path: str, state_path: str = None
return self._discover_internal(logger, config.config_path).airbyte_catalog

def read(
self, logger: logging.Logger, config: ConfigContainer, catalog_path: str, state_path: str = None
) -> Iterable[AirbyteMessage]:
"""
Implements the parent class read method.
"""
catalogs = self._discover_internal(logger, config_container.config_path)
catalogs = self._discover_internal(logger, config.config_path)
masked_airbyte_catalog = ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path))
selected_singer_catalog_path = SingerHelper.create_singer_catalog_with_selection(masked_airbyte_catalog, catalogs.singer_catalog)

read_cmd = self.read_cmd(logger, config_container.config_path, selected_singer_catalog_path, state_path)
read_cmd = self.read_cmd(logger, config.config_path, selected_singer_catalog_path, state_path)
return SingerHelper.read(logger, read_cmd)

def get_sync_mode_overrides(self) -> Dict[str, SyncModeInfo]:
Expand Down Expand Up @@ -170,7 +142,7 @@ def get_excluded_streams(self) -> List[str]:
class BaseSingerSource(SingerSource):
force_full_refresh = False

def check_config(self, logger: AirbyteLogger, config_path: str, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check_config(self, logger: logging.Logger, config_path: str, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
try:
self.try_connect(logger, config)
except self.api_error as err:
Expand All @@ -180,27 +152,23 @@ def check_config(self, logger: AirbyteLogger, config_path: str, config: Mapping[
return AirbyteConnectionStatus(status=Status.FAILED, message=error_msg)
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def discover_cmd(self, logger: AirbyteLogger, config_path: str) -> str:
def discover_cmd(self, logger: logging.Logger, config_path: str) -> str:
return f"{self.tap_cmd} --config {config_path} --discover"

def read_cmd(self, logger: AirbyteLogger, config_path: str, catalog_path: str, state_path: str = None) -> str:
def read_cmd(self, logger: logging.Logger, config_path: str, catalog_path: str, state_path: str = None) -> str:
state_path = None if self.force_full_refresh else state_path
args = {"--config": config_path, "--catalog": catalog_path, "--state": state_path}
cmd = " ".join([f"{k} {v}" for k, v in args.items() if v is not None])

return f"{self.tap_cmd} {cmd}"

# according to issue CDK: typing errors #9500, mypy raises errors on this line
# 'Argument 1 of "discover" is incompatible with supertype "Source"; supertype defines the argument type as "Logger"'
# 'Argument 2 of "discover" is incompatible with supertype "Source"; supertype defines the argument type as "Mapping[str, Any]"'
# need to fix, ignored for now
def discover(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteCatalog: # type: ignore
catalog = super().discover(logger, config_container)
def discover(self, logger: logging.Logger, config: ConfigContainer) -> AirbyteCatalog:
catalog = super().discover(logger, config)
if self.force_full_refresh:
return CatalogHelper.coerce_catalog_as_full_refresh(catalog)
return catalog

def try_connect(self, logger: AirbyteLogger, config: Mapping[str, Any]):
def try_connect(self, logger: logging.Logger, config: Mapping[str, Any]):
"""Test provided credentials, raises self.api_error if something goes wrong"""
raise NotImplementedError

Expand Down
46 changes: 30 additions & 16 deletions airbyte-cdk/python/airbyte_cdk/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,51 @@
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Any, Dict, Iterable, Mapping, MutableMapping
from typing import Any, Dict, Generic, Iterable, Mapping, MutableMapping, TypeVar

from airbyte_cdk.connector import Connector
from airbyte_cdk.connector import BaseConnector, DefaultConnectorMixin, TConfig
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog

TState = TypeVar("TState")
TCatalog = TypeVar("TCatalog")

class Source(Connector, ABC):
# can be overridden to change an input state
def read_state(self, state_path: str) -> Dict[str, Any]:
if state_path:
state_obj = json.loads(open(state_path, "r").read())
else:
state_obj = {}
state = defaultdict(dict, state_obj)
return state

# can be overridden to change an input catalog
def read_catalog(self, catalog_path: str) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path))
class BaseSource(BaseConnector[TConfig], ABC, Generic[TConfig, TState, TCatalog]):
@abstractmethod
def read_state(self, state_path: str) -> TState:
...

@abstractmethod
def read_catalog(self, catalog_path: str) -> TCatalog:
...

@abstractmethod
def read(
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
self, logger: logging.Logger, config: TConfig, catalog: TCatalog, state: TState = None
) -> Iterable[AirbyteMessage]:
"""
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
"""

@abstractmethod
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog:
"""
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a
Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
"""



class Source(DefaultConnectorMixin, BaseSource[Mapping[str, Any], MutableMapping[str, Any], ConfiguredAirbyteCatalog], ABC):
# can be overridden to change an input state
def read_state(self, state_path: str) -> Dict[str, Any]:
if state_path:
state_obj = json.loads(open(state_path, "r").read())
else:
state_obj = {}
state = defaultdict(dict, state_obj)
return state

# can be overridden to change an input catalog
def read_catalog(self, catalog_path: str) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path))
3 changes: 1 addition & 2 deletions airbyte-cdk/python/type_check_and_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

# Static Type Checking
echo "Running MyPy to static check and test files."
# Exclude Singer for the time being.
mypy --exclude '/*singer*/' airbyte_cdk/ unit_tests/
mypy airbyte_cdk/ unit_tests/

printf "\n"

Expand Down
Loading

0 comments on commit be2d1a8

Please sign in to comment.