Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 16328 implement request timeout #38

Merged
38 changes: 26 additions & 12 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
version: 2
version: 2.1
orbs:
slack: circleci/slack@3.4.2

jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
- image: amazon/dynamodb-local
entrypoint: ["java", "-Xmx1G", "-jar", "DynamoDBLocal.jar"]
steps:
Expand All @@ -19,25 +22,34 @@ jobs:
command: |
source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate
make lint
- run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update this file in line with the changes required on PR #37 and #35

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kspeer825 Updated this file as requested

name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_dynamodb --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'Tap Tester'
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox tap-tester.env
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox tap-tester.env
source tap-tester.env
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-dynamodb \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
--email=harrison+sandboxtest@stitchdata.com \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests
run-test --tap=tap-dynamodb tests
- slack/notify-on-failure:
only_for_branches: master
workflows:
version: 2
commit:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the tap-tester-user context as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the tap-tester-user context

jobs:
- build:
context: circleci-user
context:
- circleci-user
- tap-tester-user
build_daily:
triggers:
- schedule:
Expand All @@ -48,4 +60,6 @@ workflows:
- master
jobs:
- build:
context: circleci-user
context:
- circleci-user
- tap-tester-user
9 changes: 7 additions & 2 deletions tap_dynamodb/discover.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from singer import metadata
import singer
from botocore.exceptions import ClientError
import backoff
from botocore.exceptions import ClientError, ConnectTimeoutError, ReadTimeoutError
from tap_dynamodb import dynamodb

LOGGER = singer.get_logger()
Expand Down Expand Up @@ -29,7 +30,11 @@ def discover_table_schema(client, table_name):
}
}


# Backoff for both ReadTimeout and ConnectTimeout error for 5 times
@backoff.on_exception(backoff.expo,
(ReadTimeoutError, ConnectTimeoutError),
max_tries=5,
factor=2)
def discover_streams(config):
client = dynamodb.get_client(config)

Expand Down
39 changes: 33 additions & 6 deletions tap_dynamodb/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
DeferredRefreshableCredentials,
JSONFileCache
)
from botocore.exceptions import ClientError
from botocore.session import Session
from botocore.config import Config
from botocore.exceptions import ClientError, ConnectTimeoutError, ReadTimeoutError

LOGGER = singer.get_logger()
REQUEST_TIMEOUT = 300

def retry_pattern():
return backoff.on_exception(backoff.expo,
ClientError,
(ClientError, ConnectTimeoutError, ReadTimeoutError),
max_tries=5,
on_backoff=log_backoff_attempt,
factor=10)
Expand Down Expand Up @@ -65,17 +67,42 @@ def setup_aws_client(config):
LOGGER.info("Attempting to assume_role on RoleArn: %s", role_arn)
boto3.setup_default_session(botocore_session=refreshable_session)

def get_request_timeout(config):
# if request_timeout is other than 0,"0" or "" then use request_timeout
request_timeout = config.get('request_timeout')
if request_timeout and float(request_timeout):
request_timeout = float(request_timeout)
else: # If value is 0,"0" or "" then set default to 300 seconds.
request_timeout = REQUEST_TIMEOUT
return request_timeout

def get_client(config):
# get the request_timeout
request_timeout = get_request_timeout(config)
# add the request_timeout in both connect_timeout as well as read_timeout
timeout_config = Config(connect_timeout=request_timeout, read_timeout=request_timeout)
if config.get('use_local_dynamo'):
return boto3.client('dynamodb',
endpoint_url='http://localhost:8000',
region_name=config['region_name'])
return boto3.client('dynamodb', config['region_name'])
region_name=config['region_name'],
config=timeout_config # pass the config to add the request_timeout
)
return boto3.client('dynamodb', config['region_name'],
config=timeout_config # pass the config to add the request_timeout
)

def get_stream_client(config):
# get the request_timeout

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between get_stream_client and get_client function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KrisPersonal Amazon DynamoDB Streams provides API actions for accessing streams and processing stream records, which can be done through get_stream_client. Whereas, the get_client returns a low-level client representing Amazon DynamoDB

request_timeout = get_request_timeout(config)
# add the request_timeout in both connect_timeout as well as read_timeout
timeout_config = Config(connect_timeout=request_timeout, read_timeout=request_timeout)
if config.get('use_local_dynamo'):
return boto3.client('dynamodbstreams',
endpoint_url='http://localhost:8000',
region_name=config['region_name'])
region_name=config['region_name'],
config=timeout_config # pass the config to add the request_timeout
)
return boto3.client('dynamodbstreams',
region_name=config['region_name'])
region_name=config['region_name'],
config=timeout_config # pass the config to add the request_timeout
)
10 changes: 8 additions & 2 deletions tap_dynamodb/sync_strategies/full_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import singer
from singer import metadata
from tap_dynamodb import dynamodb
import backoff
from botocore.exceptions import ConnectTimeoutError, ReadTimeoutError
from tap_dynamodb.deserialize import Deserializer
from tap_dynamodb import dynamodb

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -35,7 +37,11 @@ def scan_table(table_name, projection, last_evaluated_key, config):

has_more = result.get('LastEvaluatedKey', False)


# Backoff for both ReadTimeout and ConnectTimeout error for 5 times

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrata270998 Please add this bckoff expression above the scan_table function instead. Update the test cases accordingly.

Copy link
Contributor Author

@namrata270998 namrata270998 Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karanpanchal-crest The function scan_table contains a yield. Hence the backoff won't work on that function
Please refer


for reference

@backoff.on_exception(backoff.expo,
(ReadTimeoutError, ConnectTimeoutError),
max_tries=5,
factor=2)
def sync(config, state, stream):
table_name = stream['tap_stream_id']

Expand Down
21 changes: 15 additions & 6 deletions tap_dynamodb/sync_strategies/log_based.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import datetime
from singer import metadata
import singer

from tap_dynamodb import dynamodb
from tap_dynamodb import deserialize
import backoff
from botocore.exceptions import ConnectTimeoutError, ReadTimeoutError
from tap_dynamodb import dynamodb, deserialize

LOGGER = singer.get_logger()
WRITE_STATE_PERIOD = 1000

SDC_DELETED_AT = "_sdc_deleted_at"

MAX_TRIES = 5
FACTOR = 2

def get_shards(streams_client, stream_arn):
'''
Expand Down Expand Up @@ -115,7 +116,11 @@ def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projecti
singer.write_state(state)
return rows_synced


# Backoff for both ReadTimeout and ConnectTimeout error for 5 times
@backoff.on_exception(backoff.expo,
(ReadTimeoutError, ConnectTimeoutError),
max_tries=MAX_TRIES,
factor=FACTOR)
def sync(config, state, stream):
table_name = stream['tap_stream_id']

Expand Down Expand Up @@ -212,7 +217,11 @@ def has_stream_aged_out(state, table_name):
# stream then we consider the stream to be aged out
return time_span > datetime.timedelta(hours=19, minutes=30)


# Backoff for both ReadTimeout and ConnectTimeout error for 5 times
@backoff.on_exception(backoff.expo,
(ReadTimeoutError, ConnectTimeoutError),
max_tries=MAX_TRIES,
factor=FACTOR)
def get_initial_bookmarks(config, state, table_name):
'''
Returns the state including all bookmarks necessary for the initial
Expand Down
4 changes: 0 additions & 4 deletions tests/test_dynamodb_discovery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from base import TestDynamoDBBase


Expand Down Expand Up @@ -53,6 +52,3 @@ def name():

def test_run(self):
self.pre_sync_test()


SCENARIOS.add(DynamoDBDiscovery)
4 changes: 0 additions & 4 deletions tests/test_dynamodb_full_table_interruptible_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -129,6 +128,3 @@ def test_run(self):
self.assertIsNone(state['bookmarks'][table_name].get('last_evaluated_key'))

self.assertTrue(state['bookmarks'][table_name].get('initial_full_table_complete', False))


SCENARIOS.add(DynamoDBFullTableInterruptible)
3 changes: 0 additions & 3 deletions tests/test_dynamodb_full_table_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -108,5 +107,3 @@ def test_run(self):
# assert that there is a version bookmark in state
first_versions[table_name] = state['bookmarks'][table_name]['version']
self.assertIsNotNone(first_versions[table_name])

SCENARIOS.add(DynamoDBFullTable)
4 changes: 0 additions & 4 deletions tests/test_dynamodb_log_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -143,6 +142,3 @@ def test_run(self):
self.assertEqual(31, len(stream['messages']))

state = menagerie.get_state(conn_id)


SCENARIOS.add(DynamoDBLogBased)
5 changes: 0 additions & 5 deletions tests/test_dynamodb_log_based_interruptible.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -202,7 +201,3 @@ def first_sync_test(self, table_configs, conn_id):
# as the full table sync
state['bookmarks'][table_name].pop('finished_shards')
menagerie.set_state(conn_id, state, version=state_version)



SCENARIOS.add(DynamoDBLogBased)
4 changes: 0 additions & 4 deletions tests/test_dynamodb_log_based_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -205,6 +204,3 @@ def first_sync_test(self, table_configs, conn_id, expected_streams):
for list_key in config['top_level_list_keys']:
self.assertTrue(isinstance(message['data'][list_key], list))
self.assertEqual(config['nested_map_keys']['map_field'], {*message['data']['map_field'].keys()})


SCENARIOS.add(DynamoDBLogBasedProjections)
4 changes: 0 additions & 4 deletions tests/test_dynamodb_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -134,6 +133,3 @@ def test_run(self):
for list_key in config['top_level_list_keys']:
self.assertTrue(isinstance(message['data'][list_key], list))
self.assertEqual(config['nested_map_keys']['map_field'], {*message['data']['map_field'].keys()})


SCENARIOS.add(DynamoDBProjections)
Loading