Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-13692: Missing parent objects from projection expression causes key error and TDL-16140: Fix key error and index error while applying projection expression #35

Merged
merged 17 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions tap_dynamodb/deserialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,31 @@ def _apply_projection(self, record, breadcrumb, output):
breadcrumb_key = breadcrumb[0].split('[')[0]
index = int(breadcrumb[0].split('[')[1].split(']')[0])
if output.get(breadcrumb_key):
output[breadcrumb_key].append(record[breadcrumb_key][index])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what this code will input in the Talend-Stitch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.

# only prepare output if the list field contains data at that index position in record
if len(record.get(breadcrumb_key)) > index:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
output[breadcrumb_key].append(record[breadcrumb_key][index])
else:
output[breadcrumb_key] = [record[breadcrumb_key][index]]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what this code will input in the Talend-Stitch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

output[breadcrumb_key] = []
# only prepare output if the list field contains data at that index position in record
if record.get(breadcrumb_key) and len(record.get(breadcrumb_key)) > index:
output[breadcrumb_key].append(record[breadcrumb_key][index])
else:
output[breadcrumb[0]] = record.get(breadcrumb[0])
else:
if '[' in breadcrumb[0]:
breadcrumb_key = breadcrumb[0].split('[')[0]
index = int(breadcrumb[0].split('[')[1].split(']')[0])
if output.get(breadcrumb_key) is None:
if not output.get(breadcrumb_key):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be

Suggested change
if not output.get(breadcrumb_key):
if breadcrumb_key not in output:

Copy link
Contributor

@namrata270998 namrata270998 Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cosimon
As seen in the figure, the particular code was kept to handle such scenarios where the dictionary contains an empty list which would return false if we change it to if breadcrumb_key not in output:
Hence, not updated the code.
image

output[breadcrumb_key] = [{}]
self._apply_projection(record[breadcrumb_key][index], breadcrumb[1:], output[breadcrumb_key][0])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what this code will input in the Talend-Stitch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.

# only prepare output if the list field contains data at that index position in record
if record.get(breadcrumb_key) and len(record.get(breadcrumb_key)) > index:
self._apply_projection(record[breadcrumb_key][index], breadcrumb[1:], output[breadcrumb_key][0])
else:
if output.get(breadcrumb[0]) is None:
output[breadcrumb[0]] = {}
self._apply_projection(record[breadcrumb[0]], breadcrumb[1:], output[breadcrumb[0]])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what this code will input in the Talend-Stitch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.

# keep empty dict if the data is not found in the record
if record.get(breadcrumb[0]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if record.get(breadcrumb[0]):
if breadcrumb[0] in record:

Copy link
Contributor

@namrata270998 namrata270998 Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment mentioned above applies to the change suggested here too.

self._apply_projection(record.get(breadcrumb[0], {}), breadcrumb[1:], output[breadcrumb[0]])

def apply_projection(self, record, projections):
output = {}
Expand Down
104 changes: 104 additions & 0 deletions tests/test_dynamodb_log_based_parent_child_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from boto3.dynamodb.types import TypeSerializer

from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner

from base import TestDynamoDBBase

class DynamoDBLogBasedParentChildData(TestDynamoDBBase):
"""
Test case for verifying:
- The tap does not break when the parent data is not found and the user is requesting for child data
- The tap does not break when the data a specific position is not found in the record
Comment on lines +12 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see that both cases are being tested. There should be a multiple logical syncs taking place if we are testing multiple cases like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These both conditions are tested in the test as the is in the format:
{ 'int_id': i, 'string_field': "test string", 'test_list_2': ['list_2_data'] }
and in the projection at line no. 25, we are expecting map_field.map_entry_1 (The tap does not break when the parent data is not found and the user is requesting for child data) and test_list_2[1] (The tap does not break when the data a specific position is not found in the record)

"""

# expected table configs
def expected_table_config(self):
return [
{
'TableName': 'simple_table_1',
'HashKey': 'int_id',
'HashType': 'N',
'generator': self.generate_items,
'num_rows': 10,
'ProjectionExpression': 'int_id, map_field.map_entry_1, test_list_1[0], test_list_2[0], test_list_2[1], test_list_3[0].test_field',
'top_level_keys': {'int_id', 'map_field'},
'nested_map_keys': {'map_field': {'map_entry_1'}},
}
]

# send desired data for testing
def generate_items(self, num_items, start_key=0):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add function comments

serializer = TypeSerializer()
for i in range(start_key, start_key + num_items):
record = {
'int_id': i,
'string_field': self.random_string_generator(),
'test_list_2': ['list_2_data']
}
yield serializer.serialize(record)

@staticmethod
def name():
return "tap_tester_dynamodb_parent_child_data"

def test_run(self):
(table_configs, conn_id, expected_streams) = self.pre_sync_test()

# Select 'simple_table_1' stream and add replication method and projection
found_catalogs = menagerie.get_catalogs(conn_id)
for stream_catalog in found_catalogs:
annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id'])
additional_md = [
{
"breadcrumb": [],
"metadata": {
'replication-method': 'LOG_BASED',
'tap-mongodb.projection': table_configs[0]['ProjectionExpression']
}
}
]
connections.select_catalog_and_fields_via_metadata(conn_id,
stream_catalog,
annotated_schema,
additional_md)

# diable stream to force shard to close
self.disableStreams(expected_streams)
# run sync mode 1st time as for the 1st time it sync in FULL_TABLE mode
sync_job_name = runner.run_sync_mode(self, conn_id)

exit_status = menagerie.get_exit_status(conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

# collect state file
state = menagerie.get_state(conn_id)
state_version = menagerie.get_state_version(conn_id)

# delete 'finished_shards' for every streams from the state file as we want to run 2nd sync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with dynamodb's bookmarking strategy, but I do not think it makes sense to inject a simulated state at this point in the test. Could you please explain why this bookmark key is being removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the "LOG_BASED" replication method, the Tap is forcing a "FULL_TABLE" sync for the 1st time, and from the next sync, the stream is synced in a LOG_BASED manner. Hence, in this test, when we ran the 1st sync, the tap ran in a FULL_TABLE manner, thus we have removed the finished shards from the state to re-sync the data and ran with state file to validate the projection change for the LOG_BASED replication method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. If you are confident that this mimics reality then I think this is a fine implementation. However, in the database taps we generally prefer to insert data between syncs rather than inject a simulated sync as this gives us a guaranteed copy of what an end-user would do. In the SaaS taps this is much more difficult since there is no local instance of the source to interact with. That is why you see these state injects to simulate behavior elsewhere.

for config in table_configs:
table_name = config['TableName']
del state['bookmarks'][table_name]['finished_shards']
menagerie.set_state(conn_id, state, version=state_version)

# run the sync mode 2nd time, noow it will run in LOG_BASED mode
sync_job_name = runner.run_sync_mode(self, conn_id)

# get data
messages_by_stream = runner.get_records_from_target_output()

for stream in expected_streams:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be an assertion that proves there are upsert messages present. The way this is written it will pass even if messages = [] and then the test would never hit these assertions below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code to collect records from messages and loop over every record and assert.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think an explicit assertion that we replicated records would give more confidence. But I think this does functionally accomplish with the double get in record.get('map_field').get('map_entry_1').

messages = messages_by_stream.get(stream).get('messages')
records = [message.get('data') for message in messages if message.get('action') == 'upsert']
# verify that we replicated records
self.assertTrue(len(records) > 0)
for record in records:

# verify that we get 'None' for child data when parent data is not found
self.assertIsNone(record.get('map_field').get('map_entry_1'))
# verify that we only get the available data if the data at a particular index is not found
self.assertEquals(record.get('test_list_1'), [])
self.assertEquals(record.get('test_list_2'), ['list_2_data'])
# verify that we got empty map if the parent data at a particular index is not found for child data
self.assertEquals(record.get('test_list_3'), [{}])
130 changes: 130 additions & 0 deletions tests/unittests/test_deserializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import unittest
from tap_dynamodb import deserialize

class TestDeserializer(unittest.TestCase):

def test_projection_expression_all_list_data_not_found(self):
'''
Verify that we get empty list if the data is not found in the record
Example Projection: Artist, metadata[0]
Stream Record: {'Artist': 'No One You Know'}
'''
mock_record = {'Artist': 'No One You Know5'}
mock_projections = [['Artist'], ['metadata[0]']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we get empty list if the data is not found in the record
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': []})

def test_projection_expression_all_list_data_not_found_positive(self):
'''
Verify that we get empty list if the data is not found in the record
Example Projection: Artist, metadata[0]
Stream Record: {'Artist': 'No One You Know5', 'metadata': ['test']}
'''
mock_record = {'Artist': 'No One You Know5', 'metadata': ['test']}
mock_projections = [['Artist'], ['metadata[0]']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we get empty list if the data is not found in the record
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': ['test']})

def test_projection_expression_some_list_data_not_found(self):
'''
Verify that we only get the available data in the list when user expect the data that is not available
Example Projection: Artist, metadata[0], metadata[1]
Stream Record: {'Artist': 'No One You Know','metadata': ['test1']}
'''
mock_record = {'Artist': 'No One You Know5','metadata': ['test1']}
mock_projections = [['Artist'], ['metadata[0]'], ['metadata[1]']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we only get the available data in the list when user expect the data that is not available
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': ['test1']})

def test_projection_expression_some_list_data_not_found_positive(self):
'''
Verify that we only get the available data in the list when user expect the data that is not available
Example Projection: Artist, metadata[0], metadata[1]
Stream Record: {'Artist': 'No One You Know','metadata': ['test1', 'test2']}
'''
mock_record = {'Artist': 'No One You Know5','metadata': ['test1', 'test2']}
mock_projections = [['Artist'], ['metadata[0]'], ['metadata[1]']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we only get the available data in the list when user expect the data that is not available
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': ['test1', 'test2']})

def test_projection_expression_parent_child_data_list(self):
'''
Verify that we get empty dict when the element in the list is parent element and it is not found
Example Projection: Artist, metadata[0].Age
Stream Record: {'Artist': 'No One You Know'}
'''
mock_record = {'Artist': 'No One You Know5'}
mock_projections = [['Artist'], ['metadata[0]', 'Age']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we get empty dict when the element in the list is parent element and it is not found
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': [{}]})

def test_projection_expression_parent_child_data_list_positive(self):
'''
Verify that we get empty dict when the element in the list is parent element and it is not found
Example Projection: Artist, metadata[0].Age
Stream Record: {'Artist': 'No One You Know5', 'metadata': [{'Age': 'Test'}]}
'''
mock_record = {'Artist': 'No One You Know5', 'metadata': [{'Age': 'Test'}]}
mock_projections = [['Artist'], ['metadata[0]', 'Age']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we get empty dict when the element in the list is parent element and it is not found
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': [{'Age': 'Test'}]})

def test_projection_expression_parent_child_data_dictionary(self):
'''
Veriy that we get None when the parent data is not found and we are requesting for child data
Example Projection: Artist, metadata.inner_metadata
Stream Record: {'Artist': 'No One You Know'}
'''
mock_record = {'Artist': 'No One You Know5'}
mock_projections = [['Artist'], ['metadata', 'inner_metadata']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# veriy that we get None when the parent data is not found
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': {}})

def test_projection_expression_parent_child_data_dictionary_positive(self):
'''
Veriy that we get None when the parent data is not found and we are requesting for child data
Example Projection: Artist, metadata.inner_metadata
Stream Record: {'Artist': 'No One You Know5', 'metadata': {'inner_metadata': 'Test'}}
'''
mock_record = {'Artist': 'No One You Know5', 'metadata': {'inner_metadata': 'Test'}}
mock_projections = [['Artist'], ['metadata', 'inner_metadata']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# veriy that we get None when the parent data is not found
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': {'inner_metadata': 'Test'}})

def test_projection_expression_parent_child_data_list_different_order(self):
'''
Veriy that we get no error when add list projection in decreasing order and it is not found
Example Projection: metadata[1], metadata[0].inner_metadata
Stream Record: {'metadata': [{'inner_metadata': 'Test'}]}
'''
mock_record = {'metadata': [{'inner_metadata': 'Test'}]}
mock_projections = [['metadata[1]'], ['metadata[0]', 'inner_metadata']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# veriy that we get no error when add list projection in decreasing order and it is not found
self.assertEquals(output, {'metadata': [{'inner_metadata': 'Test'}]})