From f13d6c5972403e26014913e5898970bf71db6269 Mon Sep 17 00:00:00 2001 From: "imane.berchid" Date: Thu, 2 Jun 2022 19:01:05 +0200 Subject: [PATCH] Add first state msg in Read method --- .../connectors/source-dv-360/source_dv_360/source.py | 5 +++-- .../connectors/source-dv-360/source_dv_360/streams.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-dv-360/source_dv_360/source.py b/airbyte-integrations/connectors/source-dv-360/source_dv_360/source.py index c58e179bffd9..27e77b06e7c5 100644 --- a/airbyte-integrations/connectors/source-dv-360/source_dv_360/source.py +++ b/airbyte-integrations/connectors/source-dv-360/source_dv_360/source.py @@ -7,7 +7,7 @@ from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ( AirbyteCatalog, - AirbyteConnectionStatus, + AirbyteStateMessage, AirbyteMessage, AirbyteRecordMessage, AirbyteStream, @@ -121,9 +121,10 @@ def read(self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCa stream_state= state.get(stream_name, {}) #if stream_state and "state" in dir(stream_instance): stream_instance.state= stream_state + logger.info(f"Syncing {stream_name} stream") logger.info(f"Setting state of {stream_name} stream to {stream_state}") + yield AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state)) try: - logger.info(f"Syncing {stream_name} stream") config_catalog_fields= configured_stream.stream.json_schema.get('properties').keys() slices= stream_instance.stream_slices( cursor_field= configured_stream.cursor_field, diff --git a/airbyte-integrations/connectors/source-dv-360/source_dv_360/streams.py b/airbyte-integrations/connectors/source-dv-360/source_dv_360/streams.py index 73fc894efb5b..7e9d33cd93ed 100644 --- a/airbyte-integrations/connectors/source-dv-360/source_dv_360/streams.py +++ b/airbyte-integrations/connectors/source-dv-360/source_dv_360/streams.py @@ -258,7 +258,7 @@ def buffer_reader(self, buffer:io.StringIO): class DBMIncrementalStream(DBMStream, ABC): cursor_field = "date" primary_key = None - range_days = 15 + range_days = 30 #range of stream slice def __init__(self, credentials: Credentials, partner_id: str, filters:List[dict], start_date: str, end_date: str=None): super().__init__(credentials, partner_id, filters, start_date, end_date)