Skip to content

Commit

Permalink
Connectors: Fix AirbyteLogger() for source-google-ads, source-instagr…
Browse files Browse the repository at this point in the history
…am, source-salesforce, source-s3 (#14791)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored Jul 19, 2022
1 parent 52e3755 commit e73f79c
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging

import pendulum
import pytest
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type
from source_google_ads.source import SourceGoogleAds

Expand Down Expand Up @@ -34,11 +35,11 @@ def configured_catalog():

def test_incremental_sync(config, configured_catalog):
today = pendulum.now().date()
start_date = today.subtract(months=1)
start_date = today.subtract(months=2)
config["start_date"] = start_date.to_date_string()

google_ads_client = SourceGoogleAds()
records = list(google_ads_client.read(AirbyteLogger(), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog)))
records = list(google_ads_client.read(logging.getLogger("airbyte"), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog)))
latest_state = None
for record in records[::-1]:
if record and record.type == Type.STATE:
Expand All @@ -55,7 +56,7 @@ def test_incremental_sync(config, configured_catalog):
# next sync
records = list(
google_ads_client.read(
AirbyteLogger(),
logging.getLogger("airbyte"),
config,
ConfiguredAirbyteCatalog.parse_obj(configured_catalog),
{"ad_group_ad_report": {config["customer_id"]: {"segments.date": latest_state}}},
Expand All @@ -72,7 +73,7 @@ def test_incremental_sync(config, configured_catalog):
def test_abnormally_large_state(config, configured_catalog):
google_ads_client = SourceGoogleAds()
records = google_ads_client.read(
AirbyteLogger(),
logging.getLogger("airbyte"),
config,
ConfiguredAirbyteCatalog.parse_obj(configured_catalog),
{"ad_group_ad_report": {"segments.date": "2222-06-04"}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#


import logging
import traceback
from typing import Any, Iterable, List, Mapping, MutableMapping, Tuple

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
Expand Down Expand Up @@ -79,7 +79,7 @@ def is_metrics_in_custom_query(query: str) -> bool:
return True
return False

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
try:
logger.info("Checking the config")
google_api = GoogleAds(credentials=self.get_credentials(config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, Callable, List, MutableMapping, Tuple

import pendulum
import pytest
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type
from source_instagram.source import SourceInstagram

Expand Down Expand Up @@ -51,7 +51,7 @@ def slice_catalog(catalog: ConfiguredAirbyteCatalog, predicate: Callable[[str],
def _read_records(conf, catalog, state=None) -> Tuple[List[AirbyteMessage], List[AirbyteMessage]]:
records = []
states = []
for message in SourceInstagram().read(AirbyteLogger(), conf, catalog, state=state):
for message in SourceInstagram().read(logging.getLogger("airbyte"), conf, catalog, state=state):
if message.type == Type.RECORD:
records.append(message)
elif message.type == Type.STATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from datetime import datetime
from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import (
AirbyteMessage,
AuthSpecification,
Expand Down Expand Up @@ -59,7 +59,7 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any
return ok, error_msg

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
for stream in self.streams(config):
state_key = str(stream.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import json
import logging
import os
import shutil
import time
Expand Down Expand Up @@ -104,7 +105,7 @@ def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping) -> None:


class TestIntegrationCsvFiles:
logger = AirbyteLogger()
logger = logging.getLogger("airbyte")

@memory_limit(150) # max used memory should be less than 150Mb
def read_source(self, credentials: Dict[str, Any], catalog: Dict[str, Any]) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

import csv
import io
import logging
import re
from unittest.mock import Mock

import pytest
import requests_mock
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type
from conftest import generate_stream
from requests.exceptions import HTTPError
Expand Down Expand Up @@ -220,7 +220,7 @@ def test_download_data_filter_null_bytes(stream_config, stream_api):

def test_check_connection_rate_limit(stream_config):
source = SourceSalesforce()
logger = AirbyteLogger()
logger = logging.getLogger("airbyte")

json_response = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}]
url = "https://login.salesforce.com/services/oauth2/token"
Expand Down Expand Up @@ -257,7 +257,7 @@ def test_rate_limit_bulk(stream_config, stream_api, configured_catalog, state):
source = SourceSalesforce()
source.streams = Mock()
source.streams.return_value = streams
logger = AirbyteLogger()
logger = logging.getLogger("airbyte")

json_response = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}]
with requests_mock.Mocker() as m:
Expand Down Expand Up @@ -313,7 +313,7 @@ def test_rate_limit_rest(stream_config, stream_api, configured_catalog, state):
source.streams = Mock()
source.streams.return_value = [stream_1, stream_2]

logger = AirbyteLogger()
logger = logging.getLogger("airbyte")

next_page_url = "/services/data/v52.0/query/012345"
response_1 = {
Expand Down

0 comments on commit e73f79c

Please sign in to comment.