Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 16328 implement request timeout #38

Merged
11 changes: 11 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ jobs:
command: |
source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate
make lint
- run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update this file in line with the changes required on PR #37 and #35

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kspeer825 Updated this file as requested

name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_dynamodb --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'Tap Tester'
command: |
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ lint-tests:
pylint tests -d broad-except,chained-comparison,empty-docstring,fixme,invalid-name,line-too-long,missing-class-docstring,missing-function-docstring,missing-module-docstring,no-else-raise,no-else-return,too-few-public-methods,too-many-arguments,too-many-branches,too-many-lines,too-many-locals,ungrouped-imports,wrong-spelling-in-comment,wrong-spelling-in-docstring,duplicate-code,no-name-in-module,import-error,consider-using-f-string

lint-code:
pylint tap_dynamodb -d broad-except,chained-comparison,empty-docstring,fixme,invalid-name,line-too-long,missing-class-docstring,missing-function-docstring,missing-module-docstring,no-else-raise,no-else-return,too-few-public-methods,too-many-arguments,too-many-branches,too-many-lines,too-many-locals,ungrouped-imports,wrong-spelling-in-comment,wrong-spelling-in-docstring,raise-missing-from,consider-using-f-string
pylint tap_dynamodb -d broad-except,chained-comparison,empty-docstring,fixme,invalid-name,line-too-long,missing-class-docstring,missing-function-docstring,missing-module-docstring,no-else-raise,no-else-return,too-few-public-methods,too-many-arguments,too-many-branches,too-many-lines,too-many-locals,ungrouped-imports,wrong-spelling-in-comment,wrong-spelling-in-docstring,raise-missing-from,consider-using-f-string,duplicate-code

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the duplicate-code from the global level. Instead handle it where needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


lint: lint-code lint-tests
9 changes: 7 additions & 2 deletions tap_dynamodb/discover.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from singer import metadata
import singer
from botocore.exceptions import ClientError
import backoff
from botocore.exceptions import ClientError, ConnectTimeoutError, ReadTimeoutError
from tap_dynamodb import dynamodb

LOGGER = singer.get_logger()
Expand Down Expand Up @@ -29,7 +30,11 @@ def discover_table_schema(client, table_name):
}
}


# Backoff for both ReadTimeout and ConnectTimeout error for 5 times
@backoff.on_exception(backoff.expo,
(ReadTimeoutError, ConnectTimeoutError),
max_tries=5,
factor=2)
def discover_streams(config):
client = dynamodb.get_client(config)

Expand Down
39 changes: 33 additions & 6 deletions tap_dynamodb/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
DeferredRefreshableCredentials,
JSONFileCache
)
from botocore.exceptions import ClientError
from botocore.session import Session
from botocore.config import Config
from botocore.exceptions import ClientError, ConnectTimeoutError, ReadTimeoutError

LOGGER = singer.get_logger()
REQUEST_TIMEOUT = 300

def retry_pattern():
return backoff.on_exception(backoff.expo,
ClientError,
(ClientError, ConnectTimeoutError, ReadTimeoutError),
max_tries=5,
on_backoff=log_backoff_attempt,
factor=10)
Expand Down Expand Up @@ -65,17 +67,42 @@ def setup_aws_client(config):
LOGGER.info("Attempting to assume_role on RoleArn: %s", role_arn)
boto3.setup_default_session(botocore_session=refreshable_session)

def get_request_timeout(config):
# if request_timeout is other than 0,"0" or "" then use request_timeout
request_timeout = config.get('request_timeout')
if request_timeout and float(request_timeout):
request_timeout = float(request_timeout)
else: # If value is 0,"0" or "" then set default to 300 seconds.
request_timeout = REQUEST_TIMEOUT
return request_timeout

def get_client(config):
# get the request_timeout
request_timeout = get_request_timeout(config)
# add the request_timeout in both connect_timeout as well as read_timeout
timeout_config = Config(connect_timeout=request_timeout, read_timeout=request_timeout)
if config.get('use_local_dynamo'):
return boto3.client('dynamodb',
endpoint_url='http://localhost:8000',
region_name=config['region_name'])
return boto3.client('dynamodb', config['region_name'])
region_name=config['region_name'],
config=timeout_config # pass the config to add the request_timeout
)
return boto3.client('dynamodb', config['region_name'],
config=timeout_config # pass the config to add the request_timeout
)

def get_stream_client(config):
# get the request_timeout

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between get_stream_client and get_client function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KrisPersonal Amazon DynamoDB Streams provides API actions for accessing streams and processing stream records, which can be done through get_stream_client. Whereas, the get_client returns a low-level client representing Amazon DynamoDB

request_timeout = get_request_timeout(config)
# add the request_timeout in both connect_timeout as well as read_timeout
timeout_config = Config(connect_timeout=request_timeout, read_timeout=request_timeout)
if config.get('use_local_dynamo'):
return boto3.client('dynamodbstreams',
endpoint_url='http://localhost:8000',
region_name=config['region_name'])
region_name=config['region_name'],
config=timeout_config # pass the config to add the request_timeout
)
return boto3.client('dynamodbstreams',
region_name=config['region_name'])
region_name=config['region_name'],
config=timeout_config # pass the config to add the request_timeout
)
10 changes: 8 additions & 2 deletions tap_dynamodb/sync_strategies/full_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import singer
from singer import metadata
from tap_dynamodb import dynamodb
import backoff
from botocore.exceptions import ConnectTimeoutError, ReadTimeoutError
from tap_dynamodb.deserialize import Deserializer
from tap_dynamodb import dynamodb

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -35,7 +37,11 @@ def scan_table(table_name, projection, last_evaluated_key, config):

has_more = result.get('LastEvaluatedKey', False)


# Backoff for both ReadTimeout and ConnectTimeout error for 5 times

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrata270998 Please add this bckoff expression above the scan_table function instead. Update the test cases accordingly.

Copy link
Contributor Author

@namrata270998 namrata270998 Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karanpanchal-crest The function scan_table contains a yield. Hence the backoff won't work on that function
Please refer


for reference

@backoff.on_exception(backoff.expo,
(ReadTimeoutError, ConnectTimeoutError),
max_tries=5,
factor=2)
def sync(config, state, stream):
table_name = stream['tap_stream_id']

Expand Down
18 changes: 13 additions & 5 deletions tap_dynamodb/sync_strategies/log_based.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import datetime
from singer import metadata
import singer

from tap_dynamodb import dynamodb
from tap_dynamodb import deserialize
import backoff
from botocore.exceptions import ConnectTimeoutError, ReadTimeoutError
from tap_dynamodb import dynamodb, deserialize

LOGGER = singer.get_logger()
WRITE_STATE_PERIOD = 1000
Expand Down Expand Up @@ -115,7 +115,11 @@ def sync_shard(shard, seq_number_bookmarks, streams_client, stream_arn, projecti
singer.write_state(state)
return rows_synced


# Backoff for both ReadTimeout and ConnectTimeout error for 5 times
@backoff.on_exception(backoff.expo,
(ReadTimeoutError, ConnectTimeoutError),
max_tries=5,
factor=2)
def sync(config, state, stream):
table_name = stream['tap_stream_id']

Expand Down Expand Up @@ -212,7 +216,11 @@ def has_stream_aged_out(state, table_name):
# stream then we consider the stream to be aged out
return time_span > datetime.timedelta(hours=19, minutes=30)


# Backoff for both ReadTimeout and ConnectTimeout error for 5 times
@backoff.on_exception(backoff.expo,
(ReadTimeoutError, ConnectTimeoutError),
max_tries=5,
factor=2)
def get_initial_bookmarks(config, state, table_name):
'''
Returns the state including all bookmarks necessary for the initial
Expand Down
217 changes: 217 additions & 0 deletions tests/unittests/test_request_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
from tap_dynamodb.sync_strategies import full_table, log_based
from tap_dynamodb import LOGGER, discover, dynamodb
import tap_dynamodb
import unittest
from unittest import mock
from unittest.mock import Mock
from unittest.case import TestCase
from botocore.exceptions import ClientError, ConnectTimeoutError, ReadTimeoutError

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove ClientError if not used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


class MockClient():
def __init__(self,endpoint_url=None):
self.endpoint_url="abc.com"

def list_tables(self, *args):
raise ReadTimeoutError(endpoint_url="abc.com")

def scan(self, **args):
raise ReadTimeoutError(endpoint_url="abc.com")

def describe_table(self, **args):
raise ReadTimeoutError(endpoint_url="abc.com")

def mock_get_client(*args):
return MockClient()
class MockClientConnectTimeout():
def __init__(self,endpoint_url=None):
self.endpoint_url="abc.com"

def list_tables(self, *args):
raise ConnectTimeoutError(endpoint_url="abc.com")

def scan(self, **args):
raise ConnectTimeoutError(endpoint_url="abc.com")

def describe_table(self, **args):
raise ConnectTimeoutError(endpoint_url="abc.com")

def mock_get_client(*args):
return MockClient()

def mock_get_client_connect_timeout(*args):
return MockClientConnectTimeout()

class TestBackoffError(unittest.TestCase):
'''
Test that backoff logic works properly.
'''
@mock.patch('tap_dynamodb.dynamodb.get_client',side_effect=mock_get_client)
def test_discover_stream_read_timeout_and_backoff(self, mock_client):
"""
Check whether the request backoffs properly for discover_streams for 5 times in case of ReadTimeoutError error.
"""
with self.assertRaises(ReadTimeoutError):
discover.discover_streams({"region_name": "dummy", "use_local_dynamo": "true"})
self.assertEquals(mock_client.call_count, 5)

@mock.patch('tap_dynamodb.dynamodb.get_client',side_effect=mock_get_client)
def test_scan_table_sync_read_timeout_and_backoff(self, mock_client):
"""
Check whether the request backoffs properly for full_table sync for 5 times in case of ReadTimeoutError error.
"""
with self.assertRaises(ReadTimeoutError):
full_table.sync({"region_name": "dummy", "use_local_dynamo": "true"}, {}, {"tap_stream_id":"dummy_stream", "metadata": ""})
self.assertEqual(mock_client.call_count, 5)

@mock.patch('tap_dynamodb.dynamodb.get_client',side_effect=mock_get_client)
@mock.patch('tap_dynamodb.dynamodb.get_stream_client',side_effect=mock_get_client)
def test_get_records_sync_read_timeout_and_backoff(self, mock_stream_client, mock_client):
"""
Check whether the request backoffs properly for log_based sync for 5 times in case of ReadTimeoutError error.
"""
with self.assertRaises(ReadTimeoutError):
log_based.sync({"region_name": "dummy", "use_local_dynamo": "true"}, {}, {"tap_stream_id":"dummy_stream", "metadata": ""})
self.assertEqual(mock_client.call_count, 5)
self.assertEqual(mock_stream_client.call_count, 5)

@mock.patch('tap_dynamodb.dynamodb.get_client',side_effect=mock_get_client_connect_timeout)
def test_discover_stream_connect_timeout_and_backoff(self, mock_client):
"""
Check whether the request backoffs properly for discover_streams for 5 times in case of ConnectTimeoutError error.
"""
with self.assertRaises(ConnectTimeoutError):
discover.discover_streams({"region_name": "dummy", "use_local_dynamo": "true"})
self.assertEquals(mock_client.call_count, 5)

@mock.patch('tap_dynamodb.dynamodb.get_client',side_effect=mock_get_client_connect_timeout)
def test_scan_table_sync_connect_timeout_and_backoff(self, mock_client):
"""
Check whether the request backoffs properly for full_table sync for 5 times in case of ConnectTimeoutError error.
"""
with self.assertRaises(ConnectTimeoutError):
full_table.sync({"region_name": "dummy", "use_local_dynamo": "true"}, {}, {"tap_stream_id":"dummy_stream", "metadata": ""})
self.assertEqual(mock_client.call_count, 5)

@mock.patch('tap_dynamodb.dynamodb.get_client',side_effect=mock_get_client_connect_timeout)
@mock.patch('tap_dynamodb.dynamodb.get_stream_client',side_effect=mock_get_client_connect_timeout)
def test_get_records_sync_connect_timeout_and_backoff(self, mock_stream_client, mock_client):
"""
Check whether the request backoffs properly for log_based sync for 5 times in case of ConnectTimeoutError error.
"""
with self.assertRaises(ConnectTimeoutError):
log_based.sync({"region_name": "dummy", "use_local_dynamo": "true"}, {}, {"tap_stream_id":"dummy_stream", "metadata": ""})
self.assertEqual(mock_client.call_count, 5)
self.assertEqual(mock_stream_client.call_count, 5)

@mock.patch('tap_dynamodb.dynamodb.get_client',side_effect=mock_get_client)
def test_get_initial_bookmarks_read_timeout_and_backoff(self, mock_client):
"""
Check whether the request backoffs properly for discover_streams for 5 times in case of ReadTimeoutError error.
"""
with self.assertRaises(ReadTimeoutError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this pattern

log_based.get_initial_bookmarks({"region_name": "dummy", "use_local_dynamo": "true"}, {}, "dummy_table")
self.assertEquals(mock_client.call_count, 5)

class TestRequestTimeoutValue(unittest.TestCase):
'''
Test that request timeout parameter works properly in various cases
'''
@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_config_provided_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based on config value
"""
config = {"region_name": "dummy_region", "request_timeout": 100}
dynamodb.get_client(config)
mock_config.assert_called_with(connect_timeout=100, read_timeout=100)

@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_default_value_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based default value
"""
config = {"region_name": "dummy_region"}
dynamodb.get_client(config)
mock_config.assert_called_with(connect_timeout=300, read_timeout=300)

@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_config_provided_empty_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based on default value if empty value is given in config
"""
config = {"region_name": "dummy_region", "request_timeout": ""}
dynamodb.get_client(config)
mock_config.assert_called_with(connect_timeout=300, read_timeout=300)

@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_config_provided_string_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based on config string value
"""
config = {"region_name": "dummy_region", "request_timeout": "100"}
dynamodb.get_client(config)
mock_config.assert_called_with(connect_timeout=100, read_timeout=100)
Comment on lines +155 to +157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing I can think to change is it would be better to set the default timeout value as a reusable variable for expectations. Also repeated values could be replaced with variables as well within each test. Just a suggestion though. I think they are readable and maintainable as-is, which is the important part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kspeer825 updated the code and replaced values with reusable variables.


@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_config_provided_float_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based on config float value
"""
config = {"region_name": "dummy_region", "request_timeout": 100.8}
dynamodb.get_client(config)
mock_config.assert_called_with(connect_timeout=100.8, read_timeout=100.8)

@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_stream_config_provided_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based on config value
"""
config = {"region_name": "dummy_region", "request_timeout": 100}
dynamodb.get_stream_client(config)
mock_config.assert_called_with(connect_timeout=100, read_timeout=100)

@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_stream_default_value_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based default value
"""
config = {"region_name": "dummy_region"}
dynamodb.get_stream_client(config)
mock_config.assert_called_with(connect_timeout=300, read_timeout=300)

@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_stream_config_provided_empty_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based on default value if empty value is given in config
"""
config = {"region_name": "dummy_region", "request_timeout": ""}
dynamodb.get_stream_client(config)
mock_config.assert_called_with(connect_timeout=300, read_timeout=300)

@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_stream_config_provided_string_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based on config string value
"""
config = {"region_name": "dummy_region", "request_timeout": "100"}
dynamodb.get_stream_client(config)
mock_config.assert_called_with(connect_timeout=100, read_timeout=100)

@mock.patch('boto3.client')
@mock.patch("tap_dynamodb.dynamodb.Config")
def test_stream_config_provided_float_request_timeout(self, mock_config, mock_client):
"""
Unit tests to ensure that request timeout is set based on config float value
"""
config = {"region_name": "dummy_region", "request_timeout": 100.8}
dynamodb.get_stream_client(config)
mock_config.assert_called_with(connect_timeout=100.8, read_timeout=100.8)