Skip to content

Commit

Permalink
SimpleRetriever yield request and response as log messages (#18644)
Browse files Browse the repository at this point in the history
* method yielding airbytemessage

* move to Stream

* update abstract source

* reset

* missing file

* Yield request and response as log messages

* only emit request and responses if the debug flag is on

* add test docker image

* script to run acceptance tests with local cdk

* Update conftest to use a different image

* extract to method

* dont use a different image tag

* Always install local cdk

* break the cdk

* get path from current working directory

* or

* ignore unit test

* debug log

* Revert "AMI change: ami-0f23be2f917510c26 -> ami-005924fb76f7477ce (#18689)"

This reverts commit bf06dec.

* build from the top

* Update source-acceptance-test

* fix

* copy setup

* some work on the gradle plugin

* reset to master

* delete unused file

* delete unused file

* reset to master

* optional argument

* delete dead code

* use latest cdk with sendgrid

* fix sendgrid dockerfile

* break the cdk

* use local file

* Revert "break the cdk"

This reverts commit 600c195.

* dont raise an exception

* reset to master

* unit tests

* missing test

* more unit tests

* remove deprecated comment

* newline

* reset to master

* remove files

* reset

* Update abstract source

* remove method from stream

* convert to airbytemessage

* unittests

* Update

* unit test

* remove debug logs

* Revert "remove debug logs"

This reverts commit a1a139e.

* Revert "Revert "remove debug logs""

This reverts commit b1d62cd.

* Revert "reset to master"

This reverts commit 3fa6a00.

* fix

* slightly better test

* typing

* extract method

* Revert "Revert "reset to master""

This reverts commit 5dac7c2.

* reset to master

* reset to master

* Revert "reset to master"

This reverts commit 3fa6a00.

* Comment

* operate on the message

* Revert "Revert "reset to master""

This reverts commit 5833c84.

* comment

* test

* Revert "test"

This reverts commit 2f91b80.

* test

* Revert "test"

This reverts commit 62d95eb.

* test

* Revert "test"

This reverts commit 27150ba.

* format

* format

* symlink

* Update setup

* update path

* reset to master

* update

* Add local files

* greenhouse

* format

* symlink

* try reordering

* better error message

* better log message

* reset to master

* Revert "merge for qa"

This reverts commit ad7128f, reversing
changes made to 7196c22.

* reset to master

* reset to master

* reset to master

* format

* gradlew format

* right type hints

* reset to master

* reset to master

* gradlew format

* a bunch of small fixes

* Update output format

* fixes from feedback

* fixme comment

* streams cannot return AirbyteRecordMessage

* fix

* format

* only return logs when running on debug mode

* move branching

* update typing

* remove dead code

* fix simpleretriever name

* i think this is better

* log response.text

* debug flag

* comment

* pass config

* comments

* run SATs

* fix most of the unit tests

* fix unit test

* reset to master

* runFromPath

* Revert "runFromPath"

This reverts commit 85979a8.

* Revert "run SATs"

This reverts commit a8a8a2d.

* no need to convert to dict

* fix test
  • Loading branch information
girarda authored and akashkulk committed Nov 17, 2022
1 parent 550858f commit a9c0f98
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 94 deletions.
18 changes: 12 additions & 6 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http.http import HttpStream
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
Expand Down Expand Up @@ -240,9 +241,7 @@ def _read_incremental(
)
record_counter = 0
for message_counter, record_data_or_message in enumerate(records, start=1):
message = stream_data_to_airbyte_message(
stream_name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
)
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
record = message.record
Expand Down Expand Up @@ -287,9 +286,7 @@ def _read_full_refresh(
cursor_field=configured_stream.cursor_field,
)
for record_data_or_message in record_data_or_messages:
message = stream_data_to_airbyte_message(
stream_instance.name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
)
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
Expand All @@ -315,3 +312,12 @@ def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: S
"""
if hasattr(logger, "level"):
stream_instance.logger.setLevel(logger.level)

def _get_message(self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream):
"""
Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
"""
if isinstance(record_data_or_message, AirbyteMessage):
return record_data_or_message
else:
return stream_data_to_airbyte_message(stream.name, record_data_or_message, stream.transformer, stream.get_json_schema())
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union

from airbyte_cdk.models import ConnectorSpecification
from typing import Any, Iterator, List, Mapping, MutableMapping, Union

from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
)
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand All @@ -35,12 +41,14 @@ class ManifestDeclarativeSource(DeclarativeSource):

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}

def __init__(self, source_config: ConnectionDefinition):
def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
:param debug(bool): True if debug mode is enabled
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._source_config = source_config
self._debug = debug
self._factory = DeclarativeComponentFactory()

self._validate_source()
Expand Down Expand Up @@ -73,6 +81,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""
self._configure_logger_level(logger)
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})

spec = self._source_config.get("spec")
Expand All @@ -84,6 +93,27 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
else:
return super().spec(logger)

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
self._configure_logger_level(logger)
return super().check(logger, config)

def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
self._configure_logger_level(logger)
yield from super().read(logger, config, catalog, state)

def _configure_logger_level(self, logger: logging.Logger):
"""
Set the log level to logging.DEBUG if debug mode is enabled
"""
if self._debug:
logger.setLevel(logging.DEBUG)

def _validate_source(self):
full_config = {}
if "version" in self._source_config:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from typing import Iterable, List, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import StreamData
from dataclasses_jsonschema import JsonSchemaMixin


Expand All @@ -24,7 +25,7 @@ def read_records(
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[StreamSlice] = None,
stream_state: Optional[StreamState] = None,
) -> Iterable[Record]:
) -> Iterable[StreamData]:
"""
Fetch a stream's records from an HTTP API source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import logging
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from dataclasses_jsonschema import JsonSchemaMixin


Expand Down Expand Up @@ -46,9 +52,10 @@ class SimpleRetriever(Retriever, HttpStream, JsonSchemaMixin):

requester: Requester
record_selector: HttpSelector
config: Config
options: InitVar[Mapping[str, Any]]
name: str
_name: str = field(init=False, repr=False, default="")
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
primary_key: Optional[Union[str, List[str], List[List[str]]]]
_primary_key: str = field(init=False, repr=False, default="")
paginator: Optional[Paginator] = None
Expand All @@ -59,13 +66,15 @@ def __post_init__(self, options: Mapping[str, Any]):
HttpStream.__init__(self, self.requester.get_authenticator())
self._last_response = None
self._last_records = None
self._options = options
self.name = InterpolatedString(self._name, options=options)

@property
def name(self) -> str:
"""
:return: Stream name
"""
return self._name
return self._name.eval(self.config)

@name.setter
def name(self, value: str) -> None:
Expand Down Expand Up @@ -347,17 +356,23 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[StreamSlice] = None,
stream_state: Optional[StreamState] = None,
) -> Iterable[Mapping[str, Any]]:
) -> Iterable[StreamData]:
# Warning: use self.state instead of the stream_state passed as argument!
stream_slice = stream_slice or {} # None-check
self.paginator.reset()
records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state)
for r in records_generator:
self.stream_slicer.update_cursor(stream_slice, last_record=r)
yield r
records_generator = self._read_pages(
lambda req, res, state, _slice: self.parse_records_and_emit_request_and_responses(
req, res, stream_slice=_slice, stream_state=state
),
stream_slice,
stream_state,
)
for record in records_generator:
self.stream_slicer.update_cursor(stream_slice, last_record=record)
yield record
else:
last_record = self._last_records[-1] if self._last_records else None
self.stream_slicer.update_cursor(stream_slice, last_record=last_record)
Expand Down Expand Up @@ -385,3 +400,26 @@ def state(self) -> MutableMapping[str, Any]:
def state(self, value: StreamState):
"""State setter, accept state serialized by state getter."""
self.stream_slicer.update_cursor(value)

def parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]:
# Only emit requests and responses when running in debug mode
if self.logger.isEnabledFor(logging.DEBUG):
yield self._create_trace_message_from_request(request)
yield self._create_trace_message_from_response(response)
# Not great to need to call _read_pages which is a private method
# A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester
yield from self._read_pages(
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
)

def _create_trace_message_from_request(self, request: requests.PreparedRequest):
# FIXME: this should return some sort of trace message
request_dict = {"url": request.url, "headers": dict(request.headers), "body": request.body}
log_message = filter_secrets(f"request:{json.dumps(request_dict)}")
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))

def _create_trace_message_from_response(self, response: requests.Response):
# FIXME: this should return some sort of trace message
response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code}
log_message = filter_secrets(f"response:{json.dumps(response_dict)}")
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, List, Mapping, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
Expand Down Expand Up @@ -138,6 +138,12 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Itera
for parent_record in parent_stream.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
):
# Skip non-records (eg AirbyteLogMessage)
if isinstance(parent_record, AirbyteMessage):
if parent_record.type == Type.RECORD:
parent_record = parent_record.record.data
else:
continue
empty_parent_slice = False
stream_state_value = parent_record.get(parent_field)
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
class YamlDeclarativeSource(ManifestDeclarativeSource):
"""Declarative source defined by a yaml file"""

def __init__(self, path_to_yaml):
def __init__(self, path_to_yaml, debug: bool = False):
"""
:param path_to_yaml: Path to the yaml file describing the source
"""
self._path_to_yaml = path_to_yaml
source_config = self._read_and_parse_yaml_file(path_to_yaml)
super().__init__(source_config)
super().__init__(source_config, debug)

def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition:
package = self.__class__.__module__.split(".")[0]
Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteLogMessage, AirbyteRecordMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode

# list of all possible HTTP methods which can be used for sending of request bodies
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
Expand All @@ -22,7 +22,7 @@
# AirbyteRecordMessage: An AirbyteRecordMessage
# AirbyteLogMessage: A log message
# AirbyteTraceMessage: A trace message
StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage]
StreamData = Union[Mapping[str, Any], AirbyteLogMessage, AirbyteTraceMessage]


def package_name_from_class(cls: object) -> str:
Expand Down
49 changes: 33 additions & 16 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import os
from abc import ABC, abstractmethod
from contextlib import suppress
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from urllib.parse import urljoin

import requests
import requests_cache
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.core import Stream, StreamData
from requests.auth import AuthBase
from requests_cache.session import CachedSession

Expand Down Expand Up @@ -408,24 +408,25 @@ def read_records(
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
) -> Iterable[StreamData]:
yield from self._read_pages(
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
)

def _read_pages(
self,
records_generator_fn: Callable[
[requests.PreparedRequest, requests.Response, Mapping[str, Any], Mapping[str, Any]], Iterable[StreamData]
],
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[StreamData]:
stream_state = stream_state or {}
pagination_complete = False

next_page_token = None
while not pagination_complete:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
yield from records_generator_fn(request, response, stream_state, stream_slice)

next_page_token = self.next_page_token(response)
if not next_page_token:
Expand All @@ -434,6 +435,22 @@ def read_records(
# Always return an empty generator just in case no records were ever yielded
yield from []

def _fetch_next_page(
self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Tuple[requests.PreparedRequest, requests.Response]:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

response = self._send_request(request, request_kwargs)
return request, response


class HttpSubStream(HttpStream, ABC):
def __init__(self, parent: HttpStream, **kwargs):
Expand Down
Loading

0 comments on commit a9c0f98

Please sign in to comment.