Skip to content

Commit

Permalink
CDK: Fix typing errors (#9037)
Browse files Browse the repository at this point in the history
* fix typing, drop AirbyteLogger

* format

* bump the version

* use logger instead of fixture logger

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
Co-authored-by: auganbay <auganenu@gmail.com>
  • Loading branch information
3 people authored Jan 14, 2022
1 parent 41f89d1 commit f83eca5
Show file tree
Hide file tree
Showing 26 changed files with 126 additions and 144 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.47
Fix typing errors.

## 0.1.45
Integrate Sentry for performance and errors tracking.

Expand Down
6 changes: 3 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@


import json
import logging
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Mapping, Optional

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification


Expand Down Expand Up @@ -48,7 +48,7 @@ def write_config(config: Mapping[str, Any], config_path: str):
with open(config_path, "w") as fh:
fh.write(json.dumps(config))

def spec(self, logger: AirbyteLogger) -> ConnectorSpecification:
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
required to run this integration.
Expand All @@ -59,7 +59,7 @@ def spec(self, logger: AirbyteLogger) -> ConnectorSpecification:
return ConnectorSpecification.parse_obj(json.loads(raw_spec))

@abstractmethod
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.
Expand Down
15 changes: 8 additions & 7 deletions airbyte-cdk/python/airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

import argparse
import io
import logging
import sys
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.connector import Connector
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
from pydantic import ValidationError

logger = logging.getLogger("airbyte")


class Destination(Connector, ABC):
logger = AirbyteLogger()
VALID_CMDS = {"spec", "check", "write"}

@abstractmethod
Expand All @@ -26,7 +27,7 @@ def write(
"""Implement to define how the connector writes data to the destination"""

def _run_check(self, config: Mapping[str, Any]) -> AirbyteMessage:
check_result = self.check(self.logger, config)
check_result = self.check(logger, config)
return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result)

def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
Expand All @@ -35,16 +36,16 @@ def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[Airbyt
try:
yield AirbyteMessage.parse_raw(line)
except ValidationError:
self.logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}")
logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}")

def _run_write(
self, config: Mapping[str, Any], configured_catalog_path: str, input_stream: io.TextIOWrapper
) -> Iterable[AirbyteMessage]:
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
input_messages = self._parse_input_stream(input_stream)
self.logger.info("Begin writing to the destination...")
logger.info("Begin writing to the destination...")
yield from self.write(config=config, configured_catalog=catalog, input_messages=input_messages)
self.logger.info("Writing complete.")
logger.info("Writing complete.")

def parse_args(self, args: List[str]) -> argparse.Namespace:
"""
Expand Down Expand Up @@ -86,7 +87,7 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
if cmd not in self.VALID_CMDS:
raise Exception(f"Unrecognized command: {cmd}")

spec = self.spec(self.logger)
spec = self.spec(logger)
if cmd == "spec":
yield AirbyteMessage(type=Type.SPEC, spec=spec)
return
Expand Down
53 changes: 14 additions & 39 deletions airbyte-cdk/python/airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging.config
import sys
import traceback
from typing import List
from typing import List, Tuple

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage

Expand Down Expand Up @@ -49,7 +49,6 @@ def hook_fn(exception_type, exception_value, traceback_):

def init_logger(name: str = None):
"""Initial set up of logger"""
logging.setLoggerClass(AirbyteNativeLogger)
logging.addLevelName(TRACE_LEVEL_NUM, "TRACE")
logger = logging.getLogger(name)
logger.setLevel(TRACE_LEVEL_NUM)
Expand All @@ -61,7 +60,7 @@ def init_logger(name: str = None):
class AirbyteLogFormatter(logging.Formatter):
"""Output log records using AirbyteMessage"""

_secrets = []
_secrets: List[str] = []

@classmethod
def update_secrets(cls, secrets: List[str]):
Expand All @@ -88,46 +87,22 @@ def format(self, record: logging.LogRecord) -> str:
return log_message.json(exclude_unset=True)


class AirbyteNativeLogger(logging.Logger):
"""Using native logger with implementing all AirbyteLogger features"""
def log_by_prefix(msg: str, default_level: str) -> Tuple[int, str]:
"""Custom method, which takes log level from first word of message"""
valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]
split_line = msg.split()
first_word = next(iter(split_line), None)
if first_word in valid_log_types:
log_level = logging.getLevelName(first_word)
rendered_message = " ".join(split_line[1:])
else:
log_level = logging.getLevelName(default_level)
rendered_message = msg

def __init__(self, name):
super().__init__(name)
self.valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]

def log_by_prefix(self, msg, default_level):
"""Custom method, which takes log level from first word of message"""
split_line = msg.split()
first_word = next(iter(split_line), None)
if first_word in self.valid_log_types:
log_level = logging.getLevelName(first_word)
rendered_message = " ".join(split_line[1:])
else:
default_level = default_level if default_level in self.valid_log_types else "INFO"
log_level = logging.getLevelName(default_level)
rendered_message = msg
self.log(log_level, rendered_message)

def trace(self, msg, *args, **kwargs):
self._log(TRACE_LEVEL_NUM, msg, args, **kwargs)
return log_level, rendered_message


class AirbyteLogger:
def __init__(self):
self.valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]

def log_by_prefix(self, message, default_level):
"""Custom method, which takes log level from first word of message"""
split_line = message.split()
first_word = next(iter(split_line), None)
if first_word in self.valid_log_types:
log_level = first_word
rendered_message = " ".join(split_line[1:])
else:
log_level = default_level
rendered_message = message
self.log(log_level, rendered_message)

def log(self, level, message):
log_record = AirbyteLogMessage(level=level, message=message)
log_message = AirbyteMessage(type="LOG", log=log_record)
Expand Down
24 changes: 12 additions & 12 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@


import copy
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Status,
Expand All @@ -38,8 +37,9 @@ class AbstractSource(Source, ABC):
"""

@abstractmethod
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
:param logger: source logger
:param config: The user-provided configuration as specified by the source's spec.
This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
Expand All @@ -57,19 +57,19 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""

# Stream name to instance map for applying output object transformation
_stream_to_instance_map: Dict[str, AirbyteStream] = {}
_stream_to_instance_map: Dict[str, Stream] = {}

@property
def name(self) -> str:
"""Source name"""
return self.__class__.__name__

def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog:
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
return AirbyteCatalog(streams=streams)

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
try:
check_succeeded, error = self.check_connection(logger, config)
Expand All @@ -81,7 +81,7 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
connector_state = copy.deepcopy(state or {})
Expand Down Expand Up @@ -118,7 +118,7 @@ def read(

def _read_stream(
self,
logger: AirbyteLogger,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
Expand Down Expand Up @@ -160,7 +160,7 @@ def _limit_reached(internal_config: InternalConfig, records_counter: int) -> boo

def _read_incremental(
self,
logger: AirbyteLogger,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
Expand Down Expand Up @@ -222,15 +222,15 @@ def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):
return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state))

@lru_cache(maxsize=None)
def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, dict]:
def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, Mapping[str, Any]]:
"""
Lookup stream's transform object and jsonschema based on stream name.
This function would be called a lot so using caching to save on costly
get_json_schema operation.
:param stream_name name of stream from catalog.
:return tuple with stream transformer object and discover json schema.
"""
stream_instance = self._stream_to_instance_map.get(stream_name)
stream_instance = self._stream_to_instance_map[stream_name]
return stream_instance.transformer, stream_instance.get_json_schema()

def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
Expand All @@ -240,6 +240,6 @@ def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
# need it to normalize values against json schema. By default no action
# taken unless configured. See
# docs/connector-development/cdk-python/schemas.md for details.
transformer.transform(data, schema)
transformer.transform(data, schema) # type: ignore
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
return AirbyteMessage(type=MessageType.RECORD, record=message)
4 changes: 2 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class BaseConfig(BaseModel):
"""

@classmethod
def schema(cls, **kwargs) -> Dict[str, Any]:
def schema(cls, *args, **kwargs) -> Dict[str, Any]:
"""We're overriding the schema classmethod to enable some post-processing"""
schema = super().schema(**kwargs)
schema = super().schema(*args, **kwargs)
rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf
expand_refs(schema)
schema.pop("description", None) # description added from the docstring
Expand Down
10 changes: 5 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/deprecated/base_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@


import copy
import logging
from datetime import datetime
from typing import Any, Iterable, Mapping, MutableMapping, Type

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
Expand Down Expand Up @@ -39,13 +39,13 @@ def _get_client(self, config: Mapping):
"""Construct client"""
return self.client_class(**config)

def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog:
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Discover streams"""
client = self._get_client(config)

return AirbyteCatalog(streams=[stream for stream in client.streams])

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Check connection"""
client = self._get_client(config)
alive, error = client.health_check()
Expand All @@ -55,7 +55,7 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterable[AirbyteMessage]:
state = state or {}
client = self._get_client(config)
Expand All @@ -73,7 +73,7 @@ def read(
logger.info(f"Finished syncing {self.name}")

def _read_stream(
self, logger: AirbyteLogger, client: BaseClient, configured_stream: ConfiguredAirbyteStream, state: MutableMapping[str, Any]
self, logger: logging.Logger, client: BaseClient, configured_stream: ConfiguredAirbyteStream, state: MutableMapping[str, Any]
):
stream_name = configured_stream.stream.name
use_incremental = configured_stream.sync_mode == SyncMode.incremental and client.stream_has_state(stream_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from io import TextIOWrapper
from typing import Any, DefaultDict, Dict, Iterator, List, Mapping, Optional, Tuple

from airbyte_cdk.logger import log_by_prefix
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteMessage,
Expand Down Expand Up @@ -138,7 +139,7 @@ def _read_singer_catalog(logger, shell_command: str) -> Mapping[str, Any]:
shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
)
for line in completed_process.stderr.splitlines():
logger.log_by_prefix(line, "ERROR")
logger.log(*log_by_prefix(line, "ERROR"))

return json.loads(completed_process.stdout)

Expand Down Expand Up @@ -169,9 +170,9 @@ def read(logger, shell_command, is_message=(lambda x: True)) -> Iterator[Airbyte
if message_data is not None:
yield message_data
else:
logger.log_by_prefix(line, "INFO")
logger.log(*log_by_prefix(line, "INFO"))
else:
logger.log_by_prefix(line, "ERROR")
logger.log(*log_by_prefix(line, "ERROR"))

@staticmethod
def _read_lines(process: subprocess.Popen) -> Iterator[Tuple[str, TextIOWrapper]]:
Expand Down
Loading

0 comments on commit f83eca5

Please sign in to comment.