From e73f79c69262d3e899373ebbcc56e39f4953a137 Mon Sep 17 00:00:00 2001 From: Serhii Chvaliuk Date: Tue, 19 Jul 2022 11:45:07 +0300 Subject: [PATCH] Connectors: Fix AirbyteLogger() for source-google-ads, source-instagram, source-salesforce, source-s3 (#14791) Signed-off-by: Sergey Chvalyuk --- .../integration_tests/test_incremental.py | 11 ++++++----- .../source-google-ads/source_google_ads/source.py | 4 ++-- .../integration_tests/test_streams.py | 4 ++-- .../source-instagram/source_instagram/source.py | 4 ++-- .../source-s3/integration_tests/integration_test.py | 3 ++- .../source-salesforce/unit_tests/api_test.py | 8 ++++---- 6 files changed, 18 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/connectors/source-google-ads/integration_tests/test_incremental.py b/airbyte-integrations/connectors/source-google-ads/integration_tests/test_incremental.py index a057b04d189a..3299e60478fd 100644 --- a/airbyte-integrations/connectors/source-google-ads/integration_tests/test_incremental.py +++ b/airbyte-integrations/connectors/source-google-ads/integration_tests/test_incremental.py @@ -2,9 +2,10 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import logging + import pendulum import pytest -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type from source_google_ads.source import SourceGoogleAds @@ -34,11 +35,11 @@ def configured_catalog(): def test_incremental_sync(config, configured_catalog): today = pendulum.now().date() - start_date = today.subtract(months=1) + start_date = today.subtract(months=2) config["start_date"] = start_date.to_date_string() google_ads_client = SourceGoogleAds() - records = list(google_ads_client.read(AirbyteLogger(), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog))) + records = list(google_ads_client.read(logging.getLogger("airbyte"), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog))) latest_state = None for record in records[::-1]: if record and record.type == Type.STATE: @@ -55,7 +56,7 @@ def test_incremental_sync(config, configured_catalog): # next sync records = list( google_ads_client.read( - AirbyteLogger(), + logging.getLogger("airbyte"), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog), {"ad_group_ad_report": {config["customer_id"]: {"segments.date": latest_state}}}, @@ -72,7 +73,7 @@ def test_incremental_sync(config, configured_catalog): def test_abnormally_large_state(config, configured_catalog): google_ads_client = SourceGoogleAds() records = google_ads_client.read( - AirbyteLogger(), + logging.getLogger("airbyte"), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog), {"ad_group_ad_report": {"segments.date": "2222-06-04"}}, diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py index 2b4cb261ed57..28e576715916 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py @@ -3,10 +3,10 @@ # +import logging import traceback from typing import Any, Iterable, List, Mapping, MutableMapping, Tuple -from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream @@ -79,7 +79,7 @@ def is_metrics_in_custom_query(query: str) -> bool: return True return False - def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]: + def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]: try: logger.info("Checking the config") google_api = GoogleAds(credentials=self.get_credentials(config)) diff --git a/airbyte-integrations/connectors/source-instagram/integration_tests/test_streams.py b/airbyte-integrations/connectors/source-instagram/integration_tests/test_streams.py index 4ed9ec29b3d3..1671771c62a2 100644 --- a/airbyte-integrations/connectors/source-instagram/integration_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-instagram/integration_tests/test_streams.py @@ -2,11 +2,11 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import logging from typing import Any, Callable, List, MutableMapping, Tuple import pendulum import pytest -from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type from source_instagram.source import SourceInstagram @@ -51,7 +51,7 @@ def slice_catalog(catalog: ConfiguredAirbyteCatalog, predicate: Callable[[str], def _read_records(conf, catalog, state=None) -> Tuple[List[AirbyteMessage], List[AirbyteMessage]]: records = [] states = [] - for message in SourceInstagram().read(AirbyteLogger(), conf, catalog, state=state): + for message in SourceInstagram().read(logging.getLogger("airbyte"), conf, catalog, state=state): if message.type == Type.RECORD: records.append(message) elif message.type == Type.STATE: diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/source.py b/airbyte-integrations/connectors/source-instagram/source_instagram/source.py index 0bcbd89f086d..94ddddec7e56 100644 --- a/airbyte-integrations/connectors/source-instagram/source_instagram/source.py +++ b/airbyte-integrations/connectors/source-instagram/source_instagram/source.py @@ -2,10 +2,10 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import logging from datetime import datetime from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple -from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import ( AirbyteMessage, AuthSpecification, @@ -59,7 +59,7 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any return ok, error_msg def read( - self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None ) -> Iterator[AirbyteMessage]: for stream in self.streams(config): state_key = str(stream.name) diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py index f20176ece3d3..4fd769e179d4 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py @@ -4,6 +4,7 @@ import json +import logging import os import shutil import time @@ -104,7 +105,7 @@ def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping) -> None: class TestIntegrationCsvFiles: - logger = AirbyteLogger() + logger = logging.getLogger("airbyte") @memory_limit(150) # max used memory should be less than 150Mb def read_source(self, credentials: Dict[str, Any], catalog: Dict[str, Any]) -> int: diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 961de6aae754..a868da2dd1ab 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -5,12 +5,12 @@ import csv import io +import logging import re from unittest.mock import Mock import pytest import requests_mock -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type from conftest import generate_stream from requests.exceptions import HTTPError @@ -220,7 +220,7 @@ def test_download_data_filter_null_bytes(stream_config, stream_api): def test_check_connection_rate_limit(stream_config): source = SourceSalesforce() - logger = AirbyteLogger() + logger = logging.getLogger("airbyte") json_response = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}] url = "https://login.salesforce.com/services/oauth2/token" @@ -257,7 +257,7 @@ def test_rate_limit_bulk(stream_config, stream_api, configured_catalog, state): source = SourceSalesforce() source.streams = Mock() source.streams.return_value = streams - logger = AirbyteLogger() + logger = logging.getLogger("airbyte") json_response = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}] with requests_mock.Mocker() as m: @@ -313,7 +313,7 @@ def test_rate_limit_rest(stream_config, stream_api, configured_catalog, state): source.streams = Mock() source.streams.return_value = [stream_1, stream_2] - logger = AirbyteLogger() + logger = logging.getLogger("airbyte") next_page_url = "/services/data/v52.0/query/012345" response_1 = {