Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Netsuite: fix early adopter issues #19798

Merged
merged 23 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9a156ad
fixed 466, 481 alpha-beta issues
bazarnov Nov 24, 2022
b448372
Merge remote-tracking branch 'origin/master' into baz/source-netsuite…
bazarnov Nov 28, 2022
c6b80d9
added retry logic for getting schemas
bazarnov Nov 28, 2022
724e49b
updated log message
bazarnov Nov 28, 2022
10015f8
updated code to cover unexpected json erros, while fetching the schema
bazarnov Nov 29, 2022
24a8a04
Merge remote-tracking branch 'origin/master' into baz/source-netsuite…
bazarnov Nov 29, 2022
6978634
updated '
bazarnov Dec 1, 2022
4c9bf9f
Merge remote-tracking branch 'origin/master' into baz/source-netsuite…
bazarnov Dec 1, 2022
fe154dd
Merge remote-tracking branch 'origin/master' into baz/source-netsuite…
bazarnov Dec 7, 2022
637470a
Merge remote-tracking branch 'origin/master' into baz/source-netsuite…
bazarnov Dec 7, 2022
89441dc
Merge remote-tracking branch 'origin/master' into baz/source-netsuite…
bazarnov Dec 16, 2022
790ef9e
updated source.py
bazarnov Dec 16, 2022
152c7ea
updated
bazarnov Dec 18, 2022
55e7b63
Merge remote-tracking branch 'origin/master' into baz/source-netsuite…
bazarnov Dec 18, 2022
46318c3
updated
bazarnov Dec 18, 2022
d496f6a
updated
bazarnov Dec 18, 2022
d8bda89
updated
bazarnov Dec 18, 2022
5db31d9
updated
bazarnov Dec 18, 2022
843ba4a
auto-bump connector version
octavia-squidington-iii Jan 3, 2023
17aa725
Merge branch 'master' into baz/source-netsuite-early-adopter-bugfix
bazarnov Jan 3, 2023
7ad50d5
Merge branch 'master' into baz/source-netsuite-early-adopter-bugfix
bazarnov Jan 4, 2023
e935d08
Merge branch 'master' into baz/source-netsuite-early-adopter-bugfix
bazarnov Jan 4, 2023
102ed64
Merge branch 'master' into baz/source-netsuite-early-adopter-bugfix
bazarnov Jan 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@
- name: Netsuite
sourceDefinitionId: 4f2f093d-ce44-4121-8118-9d13b7bfccd0
dockerRepository: airbyte/source-netsuite
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.com/integrations/sources/netsuite
sourceType: api
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9205,7 +9205,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-netsuite:0.1.1"
- dockerImage: "airbyte/source-netsuite:0.1.2"
spec:
documentationUrl: "https://docsurl.com"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-netsuite/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ COPY source_netsuite ./source_netsuite
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-netsuite
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ tests:
- config_path: "sample_files/invalid_config.json"
status: "failed"
discovery:
# Discovery stage is dynamic, so timeout iscreased
- config_path: "secrets/config.json"
# Discovery stage is dynamic, so timeout iscreased
timeout_seconds: 1200
basic_read:
- config_path: "secrets/config.json"
Expand All @@ -33,4 +33,5 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
timeout_seconds: 3600
timeout_seconds: 7200
threshold_days: 30
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@
"streams": [
{
"stream": {
"name": "customrecord01",
"name": "customer",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"]
},
"source_defined_cursor": true,
"default_cursor_field": ["lastModifiedDate"],
"source_defined_primary_key": [["id"]],
"sync_mode": "full_refresh",
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "customer",
"name": "customrecord01",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
"supported_sync_modes": ["full_refresh"]
},
"source_defined_cursor": true,
"default_cursor_field": ["lastModifiedDate"],
"source_defined_primary_key": [["id"]],
"sync_mode": "incremental",
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
Expand Down Expand Up @@ -94,18 +94,6 @@
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "task",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"source_defined_cursor": true,
"default_cursor_field": ["lastModifiedDate"],
"source_defined_primary_key": [["id"]],
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "salesorder",
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-netsuite/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"requests-oauthlib~=1.3",
"airbyte-cdk",
"requests-oauthlib",
]

TEST_REQUIREMENTS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@
INCREMENTAL_CURSOR: str = "lastModifiedDate"
CUSTOM_INCREMENTAL_CURSOR: str = "lastmodified"

# NETSUITE ERROR CODES BY THEIR HTTP TWINS
NETSUITE_ERRORS_MAPPING: dict = {
400: {
"USER_ERROR": "reading an Admin record allowed for Admin only",
"NONEXISTENT_FIELD": "cursor_field declared in schema but doesn't exist in object",
"INVALID_PARAMETER": "cannot read or find the object. Skipping",
},
}

NETSUITE_INPUT_DATE_FORMATS: list[str] = ["%m/%d/%Y", "%Y-%m-%d"]
NETSUITE_OUTPUT_DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%SZ"
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


# NETSUITE ERROR CODES BY THEIR HTTP TWINS
NETSUITE_ERRORS_MAPPING: dict = {
400: {
"USER_ERROR": "reading an Admin record allowed for Admin only",
"NONEXISTENT_FIELD": "cursor_field declared in schema but doesn't exist in object",
"INVALID_PARAMETER": "cannot read or find the object. Skipping",
},
403: {
"INSUFFICIENT_PERMISSION": "not enough permissions to access the object",
},
}


# NETSUITE API ERRORS EXCEPTIONS
class DateFormatExeption(Exception):
"""API CANNOT HANDLE REQUEST USING GIVEN DATETIME FORMAT"""
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import logging
from collections import Counter
from json import JSONDecodeError
from typing import Any, List, Mapping, Tuple, Union

import requests
Expand All @@ -16,6 +17,9 @@


class SourceNetsuite(AbstractSource):

logger: logging.Logger = logging.getLogger("airbyte")

def auth(self, config: Mapping[str, Any]) -> OAuth1:
return OAuth1(
client_key=config["consumer_key"],
Expand Down Expand Up @@ -50,7 +54,7 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any
# check connectivity to all provided `object_types`
for object in object_types:
try:
response = session.get(url=base_url + RECORD_PATH + object, params={"limit": 1})
response = session.get(url=base_url + RECORD_PATH + object.lower(), params={"limit": 1})
response.raise_for_status()
return True, None
except requests.exceptions.HTTPError as e:
Expand All @@ -67,11 +71,29 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any
return False, e

def get_schemas(self, object_names: Union[List[str], str], session: requests.Session, metadata_url: str) -> Mapping[str, Any]:
# fetch schemas
if isinstance(object_names, list):
return {object_name: session.get(metadata_url + object_name, headers=SCHEMA_HEADERS).json() for object_name in object_names}
elif isinstance(object_names, str):
return {object_names: session.get(metadata_url + object_names, headers=SCHEMA_HEADERS).json()}
"""
Handles multivariance of object_names type input and fetches the schema for each object type provided.
"""
try:
if isinstance(object_names, list):
schemas = {}
for object_name in object_names:
schemas.update(**self.fetch_schema(object_name, session, metadata_url))
return schemas
elif isinstance(object_names, str):
return self.fetch_schema(object_names, session, metadata_url)
else:
raise NotImplementedError(
f"Object Types has unknown structure, should be either `dict` or `str`, actual input: {object_names}"
)
except JSONDecodeError as e:
self.logger.error(f"Unexpected output while fetching the object schema. Full error: {e.__repr__()}")

def fetch_schema(self, object_name: str, session: requests.Session, metadata_url: str) -> Mapping[str, Any]:
"""
Calls the API for specific object type and returns schema as a dict.
"""
return {object_name.lower(): session.get(metadata_url + object_name, headers=SCHEMA_HEADERS).json()}

def generate_stream(
self,
Expand All @@ -83,35 +105,40 @@ def generate_stream(
base_url: str,
start_datetime: str,
window_in_days: int,
max_retry: int = 3,
) -> Union[NetsuiteStream, IncrementalNetsuiteStream, CustomIncrementalNetsuiteStream]:

logger: logging.Logger = (logging.Logger,)

input_args = {
"auth": auth,
"object_name": object_name,
"base_url": base_url,
"start_datetime": start_datetime,
"window_in_days": window_in_days,
}
try:
schema = schemas[object_name]
schema_props = schema["properties"]
if schema_props:
if INCREMENTAL_CURSOR in schema_props.keys():
return IncrementalNetsuiteStream(**input_args)
elif CUSTOM_INCREMENTAL_CURSOR in schema_props.keys():
return CustomIncrementalNetsuiteStream(**input_args)
else:
# all other streams are full_refresh
return NetsuiteStream(**input_args)
except KeyError:
logger.warn(f"Object `{object_name}` schema has missing `properties` key. Retry...")
# somethimes object metadata returns data with missing `properties` key,
# we should try to fetch metadata again to that object
schemas = self.get_schemas(object_name, session, metadata_url)
input_args.update(**{"session": session, "metadata_url": metadata_url, "schemas": schemas})
return self.generate_stream(**input_args)

schema = schemas[object_name]
schema_props = schema.get("properties")
if schema_props:
if INCREMENTAL_CURSOR in schema_props.keys():
return IncrementalNetsuiteStream(**input_args)
elif CUSTOM_INCREMENTAL_CURSOR in schema_props.keys():
return CustomIncrementalNetsuiteStream(**input_args)
else:
# all other streams are full_refresh
return NetsuiteStream(**input_args)
else:
retry_attempt = 1
while retry_attempt <= max_retry:
self.logger.warn(f"Object `{object_name}` schema has missing `properties` key. Retry attempt: {retry_attempt}/{max_retry}")
# somethimes object metadata returns data with missing `properties` key,
# we should try to fetch metadata again to that object
schemas = self.get_schemas(object_name, session, metadata_url)
if schemas[object_name].get("properties"):
input_args.update(**{"session": session, "metadata_url": metadata_url, "schemas": schemas})
return self.generate_stream(**input_args)
retry_attempt += 1
self.logger.warn(f"Object `{object_name}` schema is not available. Skipping this stream.")
return None

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = self.auth(config)
Expand All @@ -121,15 +148,15 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
object_names = config.get("object_types")

# retrieve all record types if `object_types` config field is not specified
if not config.get("object_types"):
if not object_names:
objects_metadata = session.get(metadata_url).json().get("items")
object_names = [object["name"] for object in objects_metadata]

input_args = {"session": session, "metadata_url": metadata_url}
schemas = self.get_schemas(object_names, **input_args)
input_args.update(
**{
"auth": self.auth(config),
"auth": auth,
"base_url": base_url,
"start_datetime": config["start_datetime"],
"window_in_days": config["window_in_days"],
Expand All @@ -139,6 +166,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# build streams
streams: list = []
for name in object_names:
streams.append(self.generate_stream(object_name=name, **input_args))

stream = self.generate_stream(object_name=name.lower(), **input_args)
if stream:
streams.append(stream)
return streams
Loading