Skip to content

Commit

Permalink
Add first state msg in Read method
Browse files Browse the repository at this point in the history
  • Loading branch information
iberchid authored and marcosmarxm committed Sep 27, 2022
1 parent 619bdb9 commit f13d6c5
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteStateMessage,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
Expand Down Expand Up @@ -121,9 +121,10 @@ def read(self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCa
stream_state= state.get(stream_name, {})
#if stream_state and "state" in dir(stream_instance):
stream_instance.state= stream_state
logger.info(f"Syncing {stream_name} stream")
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
yield AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state))
try:
logger.info(f"Syncing {stream_name} stream")
config_catalog_fields= configured_stream.stream.json_schema.get('properties').keys()
slices= stream_instance.stream_slices(
cursor_field= configured_stream.cursor_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def buffer_reader(self, buffer:io.StringIO):
class DBMIncrementalStream(DBMStream, ABC):
cursor_field = "date"
primary_key = None
range_days = 15
range_days = 30 #range of stream slice

def __init__(self, credentials: Credentials, partner_id: str, filters:List[dict], start_date: str, end_date: str=None):
super().__init__(credentials, partner_id, filters, start_date, end_date)
Expand Down

0 comments on commit f13d6c5

Please sign in to comment.