Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-13692: Missing parent objects from projection expression causes key error and TDL-16140: Fix key error and index error while applying projection expression #35

Merged
merged 17 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
version: 2
version: 2.1
orbs:
slack: circleci/slack@3.4.2

jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
- image: amazon/dynamodb-local
entrypoint: ["java", "-Xmx1G", "-jar", "DynamoDBLocal.jar"]
steps:
Expand All @@ -19,19 +22,26 @@ jobs:
command: |
source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate
make lint
- run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the config to the standards: tap-tester image, env vars and slack notification. I have recently documented the requirments for a circleci config file. See https://github.com/stitchdata/tap-tester/blob/master/reference/circleci_configs.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the config.yml file.

name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-dynamodb/bin/activate
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_dynamodb --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'Tap Tester'
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox tap-tester.env
source tap-tester.env
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-dynamodb \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
--email=harrison+sandboxtest@stitchdata.com \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests
run-test --tap=tap-dynamodb tests
- slack/notify-on-failure:
only_for_branches: master
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also include the tap-tester-user context after the circleci-user context in both workflow definitions. That context contains the desired slack webhook. If this is unclear and the tap-tester docs do not clarify, please let me know so I can update them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tap-tester-user context.

workflows:
version: 2
commit:
Expand Down
48 changes: 43 additions & 5 deletions tap_dynamodb/deserialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,61 @@ def _apply_projection(self, record, breadcrumb, output):
breadcrumb_key = breadcrumb[0].split('[')[0]
index = int(breadcrumb[0].split('[')[1].split(']')[0])
if output.get(breadcrumb_key):
output[breadcrumb_key].append(record[breadcrumb_key][index])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what this code will input in the Talend-Stitch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.

# record: {'Artist': 'No One You Know5', 'metadata': ['test1']}
# main breadcrumb = [['metadata[0]'], ['metadata[1]']]
# current breadcrumb = ['metadata[1]']
# current output = {'metadata': ['test1']}
# expected output = {'metadata': ['test1']}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we clean up these comments?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the comments

# as "metadata" has only 1 item and the current breadcrumb is expecting 2nd item

# only prepare output if the list field contains data at that index position in record
if len(record.get(breadcrumb_key)) > index:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
output[breadcrumb_key].append(record[breadcrumb_key][index])
else:
output[breadcrumb_key] = [record[breadcrumb_key][index]]
# record: {'Artist': 'No One You Know5'}
# main breadcrumb = [['metadata[0]']]
# current breadcrumb = ['metadata[0]']
# current output = {}
# expected output = {'metadata': []}
# as "metadata" does not have any items and the current breadcrumb is expecting 1st item

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what this code will input in the Talend-Stitch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

output[breadcrumb_key] = []
# only prepare output if the list field contains data at that index position in record
if record.get(breadcrumb_key) and len(record.get(breadcrumb_key)) > index:
output[breadcrumb_key].append(record[breadcrumb_key][index])
else:
output[breadcrumb[0]] = record.get(breadcrumb[0])
else:
if '[' in breadcrumb[0]:
breadcrumb_key = breadcrumb[0].split('[')[0]
index = int(breadcrumb[0].split('[')[1].split(']')[0])
if output.get(breadcrumb_key) is None:
if not output.get(breadcrumb_key):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be

Suggested change
if not output.get(breadcrumb_key):
if breadcrumb_key not in output:

Copy link
Contributor

@namrata270998 namrata270998 Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cosimon
As seen in the figure, the particular code was kept to handle such scenarios where the dictionary contains an empty list which would return false if we change it to if breadcrumb_key not in output:
Hence, not updated the code.
image

output[breadcrumb_key] = [{}]
self._apply_projection(record[breadcrumb_key][index], breadcrumb[1:], output[breadcrumb_key][0])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what this code will input in the Talend-Stitch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.


# record: {'Artist': 'No One You Know5'}
# main breadcrumb = [['metadata[0]', 'Age']]
# current breadcrumb = ['metadata[0]', 'Age']
# current output = {'metadata': [{}]}
# expected output = {'metadata': [{}]}
# as "metadata" is not present and the current breadcrumb is expecting 1st item and which is a parent

# only prepare output if the list field contains data at that index position in record
if record.get(breadcrumb_key) and len(record.get(breadcrumb_key)) > index:
self._apply_projection(record[breadcrumb_key][index], breadcrumb[1:], output[breadcrumb_key][0])
else:
if output.get(breadcrumb[0]) is None:
output[breadcrumb[0]] = {}
self._apply_projection(record[breadcrumb[0]], breadcrumb[1:], output[breadcrumb[0]])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what this code will input in the Talend-Stitch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.


# record: {'Artist': 'No One You Know5'}
# main breadcrumb = [['metadata', 'inner_metadata']]
# current breadcrumb = ['metadata', 'inner_metadata']
# current output = {'metadata': {}}
# expected output = {'metadata': {}}
# as "metadata" is not present and the current breadcrumb is expecting it as a parent

# keep empty dict if the data is not found in the record
if record.get(breadcrumb[0]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if record.get(breadcrumb[0]):
if breadcrumb[0] in record:

Copy link
Contributor

@namrata270998 namrata270998 Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment mentioned above applies to the change suggested here too.

self._apply_projection(record.get(breadcrumb[0], {}), breadcrumb[1:], output[breadcrumb[0]])

def apply_projection(self, record, projections):
output = {}
Expand Down
4 changes: 0 additions & 4 deletions tests/test_dynamodb_discovery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from base import TestDynamoDBBase


Expand Down Expand Up @@ -53,6 +52,3 @@ def name():

def test_run(self):
self.pre_sync_test()


SCENARIOS.add(DynamoDBDiscovery)
4 changes: 0 additions & 4 deletions tests/test_dynamodb_full_table_interruptible_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -129,6 +128,3 @@ def test_run(self):
self.assertIsNone(state['bookmarks'][table_name].get('last_evaluated_key'))

self.assertTrue(state['bookmarks'][table_name].get('initial_full_table_complete', False))


SCENARIOS.add(DynamoDBFullTableInterruptible)
3 changes: 0 additions & 3 deletions tests/test_dynamodb_full_table_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -108,5 +107,3 @@ def test_run(self):
# assert that there is a version bookmark in state
first_versions[table_name] = state['bookmarks'][table_name]['version']
self.assertIsNotNone(first_versions[table_name])

SCENARIOS.add(DynamoDBFullTable)
4 changes: 0 additions & 4 deletions tests/test_dynamodb_log_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -143,6 +142,3 @@ def test_run(self):
self.assertEqual(31, len(stream['messages']))

state = menagerie.get_state(conn_id)


SCENARIOS.add(DynamoDBLogBased)
5 changes: 0 additions & 5 deletions tests/test_dynamodb_log_based_interruptible.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -202,7 +201,3 @@ def first_sync_test(self, table_configs, conn_id):
# as the full table sync
state['bookmarks'][table_name].pop('finished_shards')
menagerie.set_state(conn_id, state, version=state_version)



SCENARIOS.add(DynamoDBLogBased)
102 changes: 102 additions & 0 deletions tests/test_dynamodb_log_based_parent_child_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from boto3.dynamodb.types import TypeSerializer

from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner

from base import TestDynamoDBBase

class DynamoDBLogBasedParentChildData(TestDynamoDBBase):
"""
Test case for verifying:
- The tap does not break when the parent data is not found and the user is requesting for child data
- The tap does not break when the data a specific position is not found in the record
Comment on lines +12 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see that both cases are being tested. There should be a multiple logical syncs taking place if we are testing multiple cases like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These both conditions are tested in the test as the is in the format:
{ 'int_id': i, 'string_field': "test string", 'test_list_2': ['list_2_data'] }
and in the projection at line no. 25, we are expecting map_field.map_entry_1 (The tap does not break when the parent data is not found and the user is requesting for child data) and test_list_2[1] (The tap does not break when the data a specific position is not found in the record)

"""

# expected table configs
def expected_table_config(self):
return [
{
'TableName': 'simple_table_1',
'HashKey': 'int_id',
'HashType': 'N',
'generator': self.generate_items,
'num_rows': 10,
'ProjectionExpression': 'int_id, map_field.map_entry_1, test_list_1[0], test_list_2[0], test_list_2[1], test_list_3[0].test_field',
'top_level_keys': {'int_id', 'map_field'},
'nested_map_keys': {'map_field': {'map_entry_1'}},
}
]

# send desired data for testing
def generate_items(self, num_items, start_key=0):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add function comments

serializer = TypeSerializer()
for i in range(start_key, start_key + num_items):
record = {
'int_id': i,
'string_field': self.random_string_generator(),
'test_list_2': ['list_2_data']
}
yield serializer.serialize(record)

@staticmethod
def name():
return "tap_tester_dynamodb_parent_child_data"

def test_run(self):
(table_configs, conn_id, expected_streams) = self.pre_sync_test()

# Select 'simple_table_1' stream and add replication method and projection
found_catalogs = menagerie.get_catalogs(conn_id)
for stream_catalog in found_catalogs:
annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id'])
additional_md = [
{
"breadcrumb": [],
"metadata": {
'replication-method': 'LOG_BASED',
'tap-mongodb.projection': table_configs[0]['ProjectionExpression']
}
}
]
connections.select_catalog_and_fields_via_metadata(conn_id,
stream_catalog,
annotated_schema,
additional_md)

# diable stream to force shard to close
self.disableStreams(expected_streams)
# run sync mode 1st time as for the 1st time it sync in FULL_TABLE mode
sync_job_name = runner.run_sync_mode(self, conn_id)

exit_status = menagerie.get_exit_status(conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

# collect state file
state = menagerie.get_state(conn_id)
state_version = menagerie.get_state_version(conn_id)

# delete 'finished_shards' for every streams from the state file as we want to run 2nd sync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with dynamodb's bookmarking strategy, but I do not think it makes sense to inject a simulated state at this point in the test. Could you please explain why this bookmark key is being removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the "LOG_BASED" replication method, the Tap is forcing a "FULL_TABLE" sync for the 1st time, and from the next sync, the stream is synced in a LOG_BASED manner. Hence, in this test, when we ran the 1st sync, the tap ran in a FULL_TABLE manner, thus we have removed the finished shards from the state to re-sync the data and ran with state file to validate the projection change for the LOG_BASED replication method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. If you are confident that this mimics reality then I think this is a fine implementation. However, in the database taps we generally prefer to insert data between syncs rather than inject a simulated sync as this gives us a guaranteed copy of what an end-user would do. In the SaaS taps this is much more difficult since there is no local instance of the source to interact with. That is why you see these state injects to simulate behavior elsewhere.

for config in table_configs:
table_name = config['TableName']
del state['bookmarks'][table_name]['finished_shards']
menagerie.set_state(conn_id, state, version=state_version)

# run the sync mode 2nd time, noow it will run in LOG_BASED mode
sync_job_name = runner.run_sync_mode(self, conn_id)

# get data
messages_by_stream = runner.get_records_from_target_output()

for stream in expected_streams:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be an assertion that proves there are upsert messages present. The way this is written it will pass even if messages = [] and then the test would never hit these assertions below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code to collect records from messages and loop over every record and assert.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think an explicit assertion that we replicated records would give more confidence. But I think this does functionally accomplish with the double get in record.get('map_field').get('map_entry_1').

messages = messages_by_stream.get(stream).get('messages')
records = [message.get('data') for message in messages if message.get('action') == 'upsert']
for record in records:

# verify that we get 'None' for child data when parent data is not found
self.assertIsNone(record.get('map_field').get('map_entry_1'))
# verify that we only get the available data if the data at a particular index is not found
self.assertEquals(record.get('test_list_1'), [])
self.assertEquals(record.get('test_list_2'), ['list_2_data'])
# verify that we got empty map if the parent data at a particular index is not found for child data
self.assertEquals(record.get('test_list_3'), [{}])
4 changes: 0 additions & 4 deletions tests/test_dynamodb_log_based_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -205,6 +204,3 @@ def first_sync_test(self, table_configs, conn_id, expected_streams):
for list_key in config['top_level_list_keys']:
self.assertTrue(isinstance(message['data'][list_key], list))
self.assertEqual(config['nested_map_keys']['map_field'], {*message['data']['map_field'].keys()})


SCENARIOS.add(DynamoDBLogBasedProjections)
4 changes: 0 additions & 4 deletions tests/test_dynamodb_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from boto3.dynamodb.types import TypeSerializer

from tap_tester.scenario import (SCENARIOS)
from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner
Expand Down Expand Up @@ -134,6 +133,3 @@ def test_run(self):
for list_key in config['top_level_list_keys']:
self.assertTrue(isinstance(message['data'][list_key], list))
self.assertEqual(config['nested_map_keys']['map_field'], {*message['data']['map_field'].keys()})


SCENARIOS.add(DynamoDBProjections)
Loading