Skip to content

Commit

Permalink
source-instagram: migrate to per-stream state (#17110)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Sep 28, 2022
1 parent b48c624 commit f006edb
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@
- name: Instagram
sourceDefinitionId: 6acf6b55-4f1e-4fca-944e-1a3caef8aba8
dockerRepository: airbyte/source-instagram
dockerImageTag: 0.1.11
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/instagram
icon: instagram.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4828,7 +4828,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-instagram:0.1.11"
- dockerImage: "airbyte/source-instagram:1.0.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/instagram"
changelogUrl: "https://docs.airbyte.io/integrations/sources/instagram"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.name=airbyte/source-instagram
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from datetime import datetime
from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.models import (
AirbyteMessage,
AuthSpecification,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
DestinationSyncMode,
OAuth2Specification,
)
from airbyte_cdk.models import AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -58,15 +50,6 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any

return ok, error_msg

def read(
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
for stream in self.streams(config):
state_key = str(stream.name)
if state and state_key in state and hasattr(stream, "upgrade_state_to_latest_format"):
state[state_key] = stream.upgrade_state_to_latest_format(state[state_key])
return super().read(logger, config, catalog, state)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""Discovery method, returns available streams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ def fields(self) -> List[str]:
fields = list(self.get_json_schema().get("properties", {}).keys())
return list(set(fields) - set(non_object_fields))

def upgrade_state_to_latest_format(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""Upgrade state to latest format and return new state object"""
return copy.deepcopy(state)

def request_params(
self,
stream_slice: Mapping[str, Any] = None,
Expand Down Expand Up @@ -251,14 +247,6 @@ def _state_has_legacy_format(self, state: Mapping[str, Any]) -> bool:
return True
return False

def upgrade_state_to_latest_format(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""Upgrade state to latest format and return new state object"""
if self._state_has_legacy_format(state):
self.logger.info(f"The {self.name} state has old format, converting...")
return {account_id: {self.cursor_field: str(cursor_value)} for account_id, cursor_value in state.items()}

return super().upgrade_state_to_latest_format(state)

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
"""Update stream state from latest record"""
record_value = latest_record[self.cursor_field]
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/instagram.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ See Facebook's [documentation on rate limiting](https://developers.facebook.com/

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------- |
| 1.0.0 | 2022-09-23 | [17110](https://github.com/airbytehq/airbyte/pull/17110) | Remove custom read function and migrate to per-stream state |
| 0.1.11 | 2022-09-08 | [16428](https://github.com/airbytehq/airbyte/pull/16428) | Fix requests metrics for Reels media product type |
| 0.1.10 | 2022-09-05 | [16340](https://github.com/airbytehq/airbyte/pull/16340) | Update to latest version of the CDK (v0.1.81) |
| 0.1.9 | 2021-09-30 | [6438](https://github.com/airbytehq/airbyte/pull/6438) | Annotate Oauth2 flow initialization parameters in connector specification |
Expand Down

0 comments on commit f006edb

Please sign in to comment.