Skip to content

Commit

Permalink
format connector
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosmarxm committed Sep 27, 2022
1 parent 9ccc0c2 commit be65546
Show file tree
Hide file tree
Showing 11 changed files with 1,046 additions and 1,035 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-dv-360/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


Expand Down
12 changes: 3 additions & 9 deletions airbyte-integrations/connectors/source-dv-360/setup.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1","google-api-python-client"
]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-api-python-client"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
"pytest-mock"
]
TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "pytest-mock"]

setup(
name="source_dv_360",
Expand Down
1,091 changes: 546 additions & 545 deletions airbyte-integrations/connectors/source-dv-360/source_dv_360/fields.py

Large diffs are not rendered by default.

110 changes: 50 additions & 60 deletions airbyte-integrations/connectors/source-dv-360/source_dv_360/source.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,35 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
from datetime import datetime
from typing import Any, Dict, Generator, List, Mapping, MutableMapping, Tuple
from typing import Any, Generator, List, Mapping, MutableMapping, Tuple

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
Status,
SyncMode,
Type,
)
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, SyncMode, Type
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

from .streams import AudienceComposition, DBMStream, Floodlight, Reach, Standard, UniqueReachAudience
from .streams import AudienceComposition, Floodlight, Reach, Standard, UniqueReachAudience


class SourceDV360(AbstractSource):

def get_credentials(self,config: json) -> Credentials:
def get_credentials(self, config: json) -> Credentials:
"""
Get the credentials from the config file and returns them as a Credentials object
"""
cred_json= config.get('credentials')
return Credentials(
token= cred_json.get('access_token'),
refresh_token= cred_json.get('refresh_token'),
token_uri= cred_json.get('token_uri'),
client_id= cred_json.get('client_id'),
client_secret= cred_json.get('client_secret')
cred_json = config.get("credentials")
creds = Credentials(
token=cred_json.get("access_token"),
refresh_token=cred_json.get("refresh_token"),
token_uri=cred_json.get("token_uri"),
client_id=cred_json.get("client_id"),
client_secret=cred_json.get("client_secret"),
)

return creds

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
Expand All @@ -55,38 +44,39 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
:return: AirbyteConnectionStatus indicating a Success or Failure
"""
try:
dbm_service= build('doubleclickbidmanager','v1.1', credentials=self.get_credentials(config))
request= dbm_service.queries().listqueries().execute()
return True, None
dbm_service = build("doubleclickbidmanager", "v1.1", credentials=self.get_credentials(config))
request = dbm_service.queries().listqueries().execute()
if request:
return True, None
except Exception as err:
return False, f"Unable to connect to Google Ads API with the provided credentials - {repr(err)}"


def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: The user-provided configuration as specified by the source's spec.
Any stream construction related operation should happen here.
:return: A list of the streams in this source connector.
"""
args= dict(
args = dict(
credentials=self.get_credentials(config),
partner_id= config.get('partner_id'),
start_date= config.get('start_date'),
end_date= config.get('end_date'),
filters= config.get('filters')
)
partner_id=config.get("partner_id"),
start_date=config.get("start_date"),
end_date=config.get("end_date"),
filters=config.get("filters"),
)

streams= [
streams = [
Reach(**args),
Standard(**args),
Standard(**args),
AudienceComposition(**args),
Floodlight(**args),
UniqueReachAudience(**args),
]
return streams


def read(self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any]) -> Generator[AirbyteMessage, None, None]:

def read(
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any]
) -> Generator[AirbyteMessage, None, None]:
"""
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration,
catalog, and state.
Expand All @@ -106,45 +96,45 @@ def read(self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCa
:return: A generator that produces a stream of AirbyteRecordMessage contained in AirbyteMessage object.
"""
stream_instances= {s.name: s for s in self.streams(config)}
stream_instances = {s.name: s for s in self.streams(config)}
for configured_stream in catalog.streams:
stream_name= configured_stream.stream.name
stream_instance= stream_instances.get(stream_name)
stream_name = configured_stream.stream.name
stream_instance = stream_instances.get(stream_name)
if not stream_instance:
raise KeyError(
f"The requested stream {stream_name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
f"The requested stream {stream_name} was not found in the source." f" Available streams: {stream_instances.keys()}"
)
stream_state= state.get(stream_name, {})
#if stream_state and "state" in dir(stream_instance):
stream_instance.state= stream_state
stream_state = state.get(stream_name, {})
# if stream_state and "state" in dir(stream_instance):
stream_instance.state = stream_state
logger.info(f"Syncing {stream_name} stream")
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
yield AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state))
try:
config_catalog_fields= configured_stream.stream.json_schema.get('properties').keys()
slices= stream_instance.stream_slices(
cursor_field= configured_stream.cursor_field,
sync_mode= SyncMode.incremental,
stream_state= stream_state,
config_catalog_fields = configured_stream.stream.json_schema.get("properties").keys()
slices = stream_instance.stream_slices(
cursor_field=configured_stream.cursor_field,
sync_mode=SyncMode.incremental,
stream_state=stream_state,
)
for _slice in slices:
data= stream_instance.read_records(
data = stream_instance.read_records(
sync_mode=SyncMode.incremental,
catalog_fields= config_catalog_fields,
catalog_fields=config_catalog_fields,
stream_slice=_slice,
stream_state= stream_state,
cursor_field= configured_stream.cursor_field or None,
stream_state=stream_state,
cursor_field=configured_stream.cursor_field or None,
)

#data= stream_instance.read_records(catalog_fields= config_catalog_fields, sync_mode= SyncMode.incremental, stream_slice= _slice)
# data= stream_instance.read_records(catalog_fields= config_catalog_fields, sync_mode= SyncMode.incremental, stream_slice= _slice)
for row in data:
yield AirbyteMessage(
type= Type.RECORD,
record= AirbyteRecordMessage(stream= stream_name, data= row, emitted_at= int(datetime.now().timestamp()) * 1000))
type=Type.RECORD,
record=AirbyteRecordMessage(stream=stream_name, data=row, emitted_at=int(datetime.now().timestamp()) * 1000),
)

yield self._checkpoint_state(stream_instance, stream_state, state)

logger.info(f"Finished syncing {stream_name} stream")
except Exception as e:
logger.error('Failed to read the data: '+ repr(e))
logger.error("Failed to read the data: " + repr(e))
Loading

0 comments on commit be65546

Please sign in to comment.