Skip to content

Commit

Permalink
Merge branch 'master' into ddavydov/#531-source-s3-fix-jsonl-nested-s…
Browse files Browse the repository at this point in the history
…tructures
  • Loading branch information
davydov-d committed Sep 19, 2022
2 parents f682947 + 73ba7b6 commit 018b1e1
Show file tree
Hide file tree
Showing 262 changed files with 6,152 additions and 42,392 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.40.6
current_version = 0.40.7
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


### SHARED ###
VERSION=0.40.6
VERSION=0.40.7

# When using the airbyte-db via default docker image
CONFIG_ROOT=/data
Expand Down
6 changes: 3 additions & 3 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Expand the relevant checklist and delete the others.
- [ ] `docs/integrations/<source or destination>/<name>.md` including changelog. See changelog [example](https://docs.airbyte.io/integrations/sources/stripe#changelog)
- [ ] `docs/integrations/README.md`
- [ ] `airbyte-integrations/builds.md`
- [ ] PR name follows [PR naming conventions](https://docs.airbyte.io/contributing-to-airbyte/updating-documentation#issues-and-pull-requests)
- [ ] PR name follows [PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/issues-and-pull-requests)

### Airbyter

Expand Down Expand Up @@ -57,7 +57,7 @@ If this is a community PR, the Airbyte engineer reviewing this PR is responsible
- [ ] Connector's `README.md`
- [ ] Connector's `bootstrap.md`. See [description and examples](https://docs.google.com/document/d/1ypdgmwmEHWv-TrO4_YOQ7pAJGVrMp5BOkEVh831N260/edit?usp=sharing)
- [ ] Changelog updated in `docs/integrations/<source or destination>/<name>.md` including changelog. See changelog [example](https://docs.airbyte.io/integrations/sources/stripe#changelog)
- [ ] PR name follows [PR naming conventions](https://docs.airbyte.io/contributing-to-airbyte/updating-documentation#issues-and-pull-requests)
- [ ] PR name follows [PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/issues-and-pull-requests)

### Airbyter

Expand All @@ -74,7 +74,7 @@ If this is a community PR, the Airbyte engineer reviewing this PR is responsible
<details><summary><strong>Connector Generator</strong></summary>

- [ ] Issue acceptance criteria met
- [ ] PR name follows [PR naming conventions](https://docs.airbyte.io/contributing-to-airbyte/updating-documentation#issues-and-pull-requests)
- [ ] PR name follows [PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/issues-and-pull-requests)
- [ ] If adding a new generator, add it to the [list of scaffold modules being tested](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connector-templates/generator/build.gradle#L41)
- [ ] The generator test modules (all connectors with `-scaffold` in their name) have been updated with the latest scaffold by running `./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates` then checking in your changes
- [ ] Documentation which references the generator is updated as needed
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ jobs:
needs: start-kube-acceptance-test-runner # required to start the main job when the runner is ready
runs-on: ${{ needs.start-kube-acceptance-test-runner.outputs.label }} # run the job on the newly created runner
environment: more-secrets
timeout-minutes: 90
timeout-minutes: 40
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion airbyte-bootloader/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ARG JDK_VERSION=19-slim-bullseye
ARG JDK_IMAGE=openjdk:${JDK_VERSION}
FROM ${JDK_IMAGE}

ARG VERSION=0.40.6
ARG VERSION=0.40.7

ENV APPLICATION airbyte-bootloader
ENV VERSION ${VERSION}
Expand Down
7 changes: 7 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Changelog

## 0.1.88
- Bugfix: Evaluate `response.text` only in debug mode

## 0.1.87
- During incremental syncs allow for streams to emit state messages in the per-stream format

## 0.1.86
- TypeTransformer now converts simple types to array of simple types
- TypeTransformer make warning message more informative

## 0.1.85
Expand Down
39 changes: 20 additions & 19 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,12 @@ def read(
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-protocol."""
state_manager = ConnectorStateManager(state=state)
connector_state = state_manager.get_legacy_state()

logger.info(f"Starting syncing {self.name}")
config, internal_config = split_config(config)
# TODO assert all streams exist in the connector
# get the streams once in case the connector needs to make any queries to generate them
stream_instances = {s.name: s for s in self.streams(config)}
state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state)
self._stream_to_instance_map = stream_instances
with create_timer(self.name) as timer:
for configured_stream in catalog.streams:
Expand All @@ -116,7 +114,7 @@ def read(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
connector_state=connector_state,
state_manager=state_manager,
internal_config=internal_config,
)
except AirbyteTracedException as e:
Expand All @@ -135,15 +133,15 @@ def read(
logger.info(f"Finished syncing {self.name}")

@property
def per_stream_state_enabled(self):
return False # While CDK per-stream is in active development we should keep this off
def per_stream_state_enabled(self) -> bool:
return True

def _read_stream(
self,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
state_manager: ConnectorStateManager,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
self._apply_log_level_to_stream_logger(logger, stream_instance)
Expand Down Expand Up @@ -172,7 +170,7 @@ def _read_stream(
logger,
stream_instance,
configured_stream,
connector_state,
state_manager,
internal_config,
)
else:
Expand Down Expand Up @@ -206,20 +204,21 @@ def _read_incremental(
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
state_manager: ConnectorStateManager,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
"""Read stream using incremental algorithm
:param logger:
:param stream_instance:
:param configured_stream:
:param connector_state:
:param state_manager:
:param internal_config:
:return:
"""
stream_name = configured_stream.stream.name
stream_state = connector_state.get(stream_name, {})
stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)

if stream_state and "state" in dir(stream_instance):
stream_instance.state = stream_state
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
Expand All @@ -233,7 +232,7 @@ def _read_incremental(
total_records_counter = 0
if not slices:
# Safety net to ensure we always emit at least one state message even if there are no slices
checkpoint = self._checkpoint_state(stream_instance, stream_state, connector_state)
checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager)
yield checkpoint
for _slice in slices:
logger.debug("Processing stream slice", extra={"slice": _slice})
Expand All @@ -248,7 +247,7 @@ def _read_incremental(
stream_state = stream_instance.get_updated_state(stream_state, record_data)
checkpoint_interval = stream_instance.state_checkpoint_interval
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_instance, stream_state, connector_state)
yield self._checkpoint_state(stream_instance, stream_state, state_manager)

total_records_counter += 1
# This functionality should ideally live outside of this method
Expand All @@ -258,7 +257,7 @@ def _read_incremental(
# Break from slice loop to save state and exit from _read_incremental function.
break

yield self._checkpoint_state(stream_instance, stream_state, connector_state)
yield self._checkpoint_state(stream_instance, stream_state, state_manager)
if self._limit_reached(internal_config, total_records_counter):
return

Expand All @@ -285,13 +284,15 @@ def _read_full_refresh(
if self._limit_reached(internal_config, total_records_counter):
return

def _checkpoint_state(self, stream, stream_state, connector_state):
def _checkpoint_state(self, stream: Stream, stream_state, state_manager: ConnectorStateManager):
# First attempt to retrieve the current state using the stream's state property. We receive an AttributeError if the state
# property is not implemented by the stream instance and as a fallback, use the stream_state retrieved from the stream
# instance's deprecated get_updated_state() method.
try:
connector_state[stream.name] = stream.state
state_manager.update_state_for_stream(stream.name, stream.namespace, stream.state)
except AttributeError:
connector_state[stream.name] = stream_state

return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state))
state_manager.update_state_for_stream(stream.name, stream.namespace, stream_state)
return state_manager.create_state_message(stream.name, stream.namespace, send_per_stream_state=self.per_stream_state_enabled)

@lru_cache(maxsize=None)
def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, Mapping[str, Any]]:
Expand Down
Loading

0 comments on commit 018b1e1

Please sign in to comment.