diff --git a/airbyte-cdk/python/airbyte_cdk/connector.py b/airbyte-cdk/python/airbyte_cdk/connector.py index 42373476888c..ffcb70dc4b23 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector.py +++ b/airbyte-cdk/python/airbyte_cdk/connector.py @@ -8,7 +8,7 @@ import os import pkgutil from abc import ABC, abstractmethod -from typing import Any, Mapping, Optional +from typing import Any, Mapping, Optional, Generic, TypeVar, Protocol import yaml from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification @@ -33,27 +33,27 @@ def __init__(self, spec_string): self.spec_string = spec_string -class Connector(ABC): +TConfig = TypeVar("TConfig", bound=Mapping[str, Any]) + + +class BaseConnector(ABC, Generic[TConfig]): # configure whether the `check_config_against_spec_or_exit()` needs to be called check_config_against_spec: bool = True - # can be overridden to change an input config - def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: + @abstractmethod + def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig: """ Persist config in temporary directory to run the Source job """ - config_path = os.path.join(temp_dir, "config.json") - self.write_config(config, config_path) - return config @staticmethod - def read_config(config_path: str) -> Mapping[str, Any]: + def read_config(config_path: str) -> TConfig: with open(config_path, "r") as file: contents = file.read() return json.loads(contents) @staticmethod - def write_config(config: Mapping[str, Any], config_path: str): + def write_config(config: TConfig, config_path: str): with open(config_path, "w") as fh: fh.write(json.dumps(config)) @@ -81,8 +81,26 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: return ConnectorSpecification.parse_obj(spec_obj) @abstractmethod - def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus: """ Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API. """ + + +class _WriteConfigProtocol(Protocol): + @staticmethod + def write_config(config: Mapping[str, Any], config_path: str): + ... + + +class DefaultConnectorMixin: + # can be overridden to change an input config + def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: + config_path = os.path.join(temp_dir, "config.json") + self.write_config(config, config_path) + return config + + +class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): + ... diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index d996c7798e8d..74044291ff01 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -84,11 +84,9 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: # Remove internal flags from config before validating so # jsonschema's additionalProperties flag wont fail the validation - config, internal_config = split_config(config) + connector_config, _ = split_config(config) if self.source.check_config_against_spec or cmd == "check": - check_config_against_spec_or_exit(config, source_spec) - # Put internal flags back to config dict - config.update(internal_config.dict()) + check_config_against_spec_or_exit(connector_config, source_spec) if cmd == "check": check_result = self.source.check(self.logger, config) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/singer/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/singer/__init__.py index b0f243a9be77..6c76280f33c2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/singer/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/singer/__init__.py @@ -3,6 +3,6 @@ # from .singer_helpers import SingerHelper, SyncModeInfo -from .source import SingerSource +from .source import ConfigContainer, SingerSource -__all__ = ["SingerSource", "SyncModeInfo", "SingerHelper"] +__all__ = ["ConfigContainer", "SingerSource", "SyncModeInfo", "SingerHelper"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/singer/source.py b/airbyte-cdk/python/airbyte_cdk/sources/singer/source.py index c12453611583..ca4cbb420920 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/singer/source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/singer/source.py @@ -4,38 +4,34 @@ import os -from dataclasses import dataclass +import logging from typing import Any, Dict, Iterable, List, Mapping, Type -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status -from airbyte_cdk.sources.source import Source +from airbyte_cdk.sources.source import BaseSource from airbyte_cdk.sources.utils.catalog_helpers import CatalogHelper from .singer_helpers import Catalogs, SingerHelper, SyncModeInfo -@dataclass -class ConfigContainer: - config: Mapping[str, Any] +class ConfigContainer(Dict[str, Any]): config_path: str + def __init__(self, config, config_path): + super().__init__(config) + self.config_path = config_path -class SingerSource(Source): - # can be overridden to change an input config - # according to issue CDK: typing errors #9500, mypy raises error on this line - # 'Return type "ConfigContainer" of "configure" incompatible with return type "Mapping[str, Any]" in supertype "Connector"' - # need to fix, ignored for now - def configure(self, raw_config: Mapping[str, Any], temp_dir: str) -> ConfigContainer: # type: ignore +class SingerSource(BaseSource[ConfigContainer, str, str]): + def configure(self, config: Mapping[str, Any], temp_dir: str) -> ConfigContainer: """ Persist raw_config in temporary directory to run the Source job This can be overridden if extra temporary files need to be persisted in the temp dir """ - config = self.transform_config(raw_config) config_path = os.path.join(temp_dir, "config.json") + config = ConfigContainer(self.transform_config(config), config_path) self.write_config(config, config_path) - return ConfigContainer(config, config_path) + return config # Can be overridden to change an input config def transform_config(self, config: Mapping[str, Any]) -> Mapping[str, Any]: @@ -44,41 +40,33 @@ def transform_config(self, config: Mapping[str, Any]) -> Mapping[str, Any]: """ return config - # Overriding to change an input catalog as path instead - # according to issue CDK: typing errors #9500, mypy raises error on this line - # 'Return type "str" of "read_catalog" incompatible with return type "ConfiguredAirbyteCatalog" in supertype "Source"' - # need to fix, ignored for now - def read_catalog(self, catalog_path: str) -> str: # type: ignore + def read_catalog(self, catalog_path: str) -> str: """ Since singer source don't need actual catalog object, we override this to return path only """ return catalog_path - # Overriding to change an input state as path instead - # according to issue CDK: typing errors #9500, mypy raises error on this line - # 'Return type "str" of "read_state" incompatible with return type "Dict[str, Any]" in supertype "Source"' - # need to fix, ignored for now - def read_state(self, state_path: str) -> str: # type: ignore + def read_state(self, state_path: str) -> str: """ Since singer source don't need actual state object, we override this to return path only """ return state_path - def check_config(self, logger: AirbyteLogger, config_path: str, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + def check_config(self, logger: logging.Logger, config_path: str, config: ConfigContainer) -> AirbyteConnectionStatus: """ Some Singer source may perform check using config_path or config to tests if the input configuration can be used to successfully connect to the integration """ raise NotImplementedError - def discover_cmd(self, logger: AirbyteLogger, config_path: str) -> str: + def discover_cmd(self, logger: logging.Logger, config_path: str) -> str: """ Returns the command used to run discovery in the singer tap. For example, if the bash command used to invoke the singer tap is `tap-postgres`, and the config JSON lived in "/path/config.json", this method would return "tap-postgres --config /path/config.json" """ raise NotImplementedError - def read_cmd(self, logger: AirbyteLogger, config_path: str, catalog_path: str, state_path: str = None) -> str: + def read_cmd(self, logger: logging.Logger, config_path: str, catalog_path: str, state_path: str = None) -> str: """ Returns the command used to read data from the singer tap. For example, if the bash command used to invoke the singer tap is `tap-postgres`, and the config JSON lived in "/path/config.json", and the catalog was in "/path/catalog.json", @@ -86,52 +74,36 @@ def read_cmd(self, logger: AirbyteLogger, config_path: str, catalog_path: str, s """ raise NotImplementedError - def _discover_internal(self, logger: AirbyteLogger, config_path: str) -> Catalogs: + def _discover_internal(self, logger: logging.Logger, config_path: str) -> Catalogs: cmd = self.discover_cmd(logger, config_path) catalogs = SingerHelper.get_catalogs( logger, cmd, self.get_sync_mode_overrides(), self.get_primary_key_overrides(), self.get_excluded_streams() ) return catalogs - # according to issue CDK: typing errors #9500, mypy raises errors on this line - # 'Argument 1 of "check" is incompatible with supertype "Connector"; supertype defines the argument type as "Logger"' - # 'Argument 2 of "check" is incompatible with supertype "Connector"; supertype defines the argument type as "Mapping[str, Any]"' - # need to fix, ignored for now - def check(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteConnectionStatus: # type: ignore + def check(self, logger: logging.Logger, config: ConfigContainer) -> AirbyteConnectionStatus: """ Tests if the input configuration can be used to successfully connect to the integration """ - return self.check_config(logger, config_container.config_path, config_container.config) + return self.check_config(logger, config.config_path, config) - # according to issue CDK: typing errors #9500, mypy raises errors on this line - # 'Argument 1 of "discover" is incompatible with supertype "Source"; supertype defines the argument type as "Logger"' - # need to fix, ignored for now - def discover(self, logger: AirbyteLogger, config_container) -> AirbyteCatalog: # type: ignore + def discover(self, logger: logging.Logger, config: ConfigContainer) -> AirbyteCatalog: """ Implements the parent class discover method. """ - if isinstance(config_container, ConfigContainer): - return self._discover_internal(logger, config_container.config_path).airbyte_catalog - else: - return self._discover_internal(logger, config_container).airbyte_catalog - - # according to issue CDK: typing errors #9500, mypy raises errors on this line - # 'Argument 1 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "Logger"' - # 'Argument 2 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "Mapping[str, Any]"' - # 'Argument 3 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "ConfiguredAirbyteCatalog"' - # 'Argument 4 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "Optional[MutableMapping[str, Any]]"' - # need to fix, ignored for now - def read( # type: ignore - self, logger: AirbyteLogger, config_container: ConfigContainer, catalog_path: str, state_path: str = None + return self._discover_internal(logger, config.config_path).airbyte_catalog + + def read( + self, logger: logging.Logger, config: ConfigContainer, catalog_path: str, state_path: str = None ) -> Iterable[AirbyteMessage]: """ Implements the parent class read method. """ - catalogs = self._discover_internal(logger, config_container.config_path) + catalogs = self._discover_internal(logger, config.config_path) masked_airbyte_catalog = ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path)) selected_singer_catalog_path = SingerHelper.create_singer_catalog_with_selection(masked_airbyte_catalog, catalogs.singer_catalog) - read_cmd = self.read_cmd(logger, config_container.config_path, selected_singer_catalog_path, state_path) + read_cmd = self.read_cmd(logger, config.config_path, selected_singer_catalog_path, state_path) return SingerHelper.read(logger, read_cmd) def get_sync_mode_overrides(self) -> Dict[str, SyncModeInfo]: @@ -170,7 +142,7 @@ def get_excluded_streams(self) -> List[str]: class BaseSingerSource(SingerSource): force_full_refresh = False - def check_config(self, logger: AirbyteLogger, config_path: str, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + def check_config(self, logger: logging.Logger, config_path: str, config: Mapping[str, Any]) -> AirbyteConnectionStatus: try: self.try_connect(logger, config) except self.api_error as err: @@ -180,27 +152,23 @@ def check_config(self, logger: AirbyteLogger, config_path: str, config: Mapping[ return AirbyteConnectionStatus(status=Status.FAILED, message=error_msg) return AirbyteConnectionStatus(status=Status.SUCCEEDED) - def discover_cmd(self, logger: AirbyteLogger, config_path: str) -> str: + def discover_cmd(self, logger: logging.Logger, config_path: str) -> str: return f"{self.tap_cmd} --config {config_path} --discover" - def read_cmd(self, logger: AirbyteLogger, config_path: str, catalog_path: str, state_path: str = None) -> str: + def read_cmd(self, logger: logging.Logger, config_path: str, catalog_path: str, state_path: str = None) -> str: state_path = None if self.force_full_refresh else state_path args = {"--config": config_path, "--catalog": catalog_path, "--state": state_path} cmd = " ".join([f"{k} {v}" for k, v in args.items() if v is not None]) return f"{self.tap_cmd} {cmd}" - # according to issue CDK: typing errors #9500, mypy raises errors on this line - # 'Argument 1 of "discover" is incompatible with supertype "Source"; supertype defines the argument type as "Logger"' - # 'Argument 2 of "discover" is incompatible with supertype "Source"; supertype defines the argument type as "Mapping[str, Any]"' - # need to fix, ignored for now - def discover(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteCatalog: # type: ignore - catalog = super().discover(logger, config_container) + def discover(self, logger: logging.Logger, config: ConfigContainer) -> AirbyteCatalog: + catalog = super().discover(logger, config) if self.force_full_refresh: return CatalogHelper.coerce_catalog_as_full_refresh(catalog) return catalog - def try_connect(self, logger: AirbyteLogger, config: Mapping[str, Any]): + def try_connect(self, logger: logging.Logger, config: Mapping[str, Any]): """Test provided credentials, raises self.api_error if something goes wrong""" raise NotImplementedError diff --git a/airbyte-cdk/python/airbyte_cdk/sources/source.py b/airbyte-cdk/python/airbyte_cdk/sources/source.py index 4f934cde0be9..8ad24e342cd8 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/source.py @@ -7,37 +7,51 @@ import logging from abc import ABC, abstractmethod from collections import defaultdict -from typing import Any, Dict, Iterable, Mapping, MutableMapping +from typing import Any, Dict, Generic, Iterable, Mapping, MutableMapping, TypeVar -from airbyte_cdk.connector import Connector +from airbyte_cdk.connector import BaseConnector, DefaultConnectorMixin, TConfig from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog +TState = TypeVar("TState") +TCatalog = TypeVar("TCatalog") -class Source(Connector, ABC): - # can be overridden to change an input state - def read_state(self, state_path: str) -> Dict[str, Any]: - if state_path: - state_obj = json.loads(open(state_path, "r").read()) - else: - state_obj = {} - state = defaultdict(dict, state_obj) - return state - # can be overridden to change an input catalog - def read_catalog(self, catalog_path: str) -> ConfiguredAirbyteCatalog: - return ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path)) +class BaseSource(BaseConnector[TConfig], ABC, Generic[TConfig, TState, TCatalog]): + @abstractmethod + def read_state(self, state_path: str) -> TState: + ... + + @abstractmethod + def read_catalog(self, catalog_path: str) -> TCatalog: + ... @abstractmethod def read( - self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + self, logger: logging.Logger, config: TConfig, catalog: TCatalog, state: TState = None ) -> Iterable[AirbyteMessage]: """ Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state. """ @abstractmethod - def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: + def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog: """ Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field. """ + + + +class Source(DefaultConnectorMixin, BaseSource[Mapping[str, Any], MutableMapping[str, Any], ConfiguredAirbyteCatalog], ABC): + # can be overridden to change an input state + def read_state(self, state_path: str) -> Dict[str, Any]: + if state_path: + state_obj = json.loads(open(state_path, "r").read()) + else: + state_obj = {} + state = defaultdict(dict, state_obj) + return state + + # can be overridden to change an input catalog + def read_catalog(self, catalog_path: str) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path)) diff --git a/airbyte-cdk/python/type_check_and_test.sh b/airbyte-cdk/python/type_check_and_test.sh index 643786b9f529..93fccd7e1e19 100755 --- a/airbyte-cdk/python/type_check_and_test.sh +++ b/airbyte-cdk/python/type_check_and_test.sh @@ -5,8 +5,7 @@ # Static Type Checking echo "Running MyPy to static check and test files." -# Exclude Singer for the time being. -mypy --exclude '/*singer*/' airbyte_cdk/ unit_tests/ +mypy airbyte_cdk/ unit_tests/ printf "\n" diff --git a/airbyte-integrations/connector-templates/source-singer/source_{{snakeCase name}}_singer/source.py.hbs b/airbyte-integrations/connector-templates/source-singer/source_{{snakeCase name}}_singer/source.py.hbs index 4f7b9b6c9aa0..3d075252048f 100644 --- a/airbyte-integrations/connector-templates/source-singer/source_{{snakeCase name}}_singer/source.py.hbs +++ b/airbyte-integrations/connector-templates/source-singer/source_{{snakeCase name}}_singer/source.py.hbs @@ -3,17 +3,16 @@ # -import json +import logging -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteConnectionStatus, Status -from airbyte_cdk.sources.singer import SingerSource +from airbyte_cdk.sources.singer import ConfigContainer, SingerSource class Source{{properCase name}}Singer(SingerSource): TAP_CMD = "{{ tap_name }}" - def check_config(self, logger: AirbyteLogger, config_path: str, config: json) -> AirbyteConnectionStatus: + def check_config(self, logger: logging.Logger, config_path: str, config: ConfigContainer) -> AirbyteConnectionStatus: """ Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API. @@ -33,14 +32,14 @@ class Source{{properCase name}}Singer(SingerSource): except Exception as e: return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}") - def discover_cmd(self, logger: AirbyteLogger, config_path: str) -> str: + def discover_cmd(self, logger: logging.Logger, config_path: str) -> str: """ Return the string commands to invoke the tap with the --discover flag and the right configuration options """ # TODO update the command below if needed. Otherwise you're good to go return f"{self.TAP_CMD} -c {config_path} --discover" - def read_cmd(self, logger: AirbyteLogger, config_path: str, catalog_path: str, state_path: str = None) -> str: + def read_cmd(self, logger: logging.Logger, config_path: str, catalog_path: str, state_path: str = None) -> str: """ Return the string commands to invoke the tap with the right configuration options to read data from the source """