Skip to content

Commit

Permalink
TDL-13692: Missing parent objects from projection expression causes k…
Browse files Browse the repository at this point in the history
…ey error and TDL-16140: Fix key error and index error while applying projection expression (#35)

* added code change for handling key error and index error

* resolve pylint error

* updated the unittests

* updated config.yml to run unittest to run on CCi

* added unittests

* updated the code to handle child dict data

* added comments with example for projection

* updated config.yml file and resolved PR review comments

* resolve slack failure error in config file

* updated version 2 to 2.1

* remoed SCENARIO from the test cases

* resolved comments

* reverted changes

* added tap-tester-venv, tap_tester_sandbox and check for verifying that we replicated records

Co-authored-by: namrata270998 <namrata.brahmbhatt@crestdatasys.com>
  • Loading branch information
hpatel41 and namrata270998 authored Feb 14, 2022
1 parent 35b97f0 commit 3d49bbf
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 6 deletions.
20 changes: 14 additions & 6 deletions tap_dynamodb/deserialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,31 @@ def _apply_projection(self, record, breadcrumb, output):
breadcrumb_key = breadcrumb[0].split('[')[0]
index = int(breadcrumb[0].split('[')[1].split(']')[0])
if output.get(breadcrumb_key):
output[breadcrumb_key].append(record[breadcrumb_key][index])
# only prepare output if the list field contains data at that index position in record
if len(record.get(breadcrumb_key)) > index:
output[breadcrumb_key].append(record[breadcrumb_key][index])
else:
output[breadcrumb_key] = [record[breadcrumb_key][index]]

output[breadcrumb_key] = []
# only prepare output if the list field contains data at that index position in record
if record.get(breadcrumb_key) and len(record.get(breadcrumb_key)) > index:
output[breadcrumb_key].append(record[breadcrumb_key][index])
else:
output[breadcrumb[0]] = record.get(breadcrumb[0])
else:
if '[' in breadcrumb[0]:
breadcrumb_key = breadcrumb[0].split('[')[0]
index = int(breadcrumb[0].split('[')[1].split(']')[0])
if output.get(breadcrumb_key) is None:
if not output.get(breadcrumb_key):
output[breadcrumb_key] = [{}]
self._apply_projection(record[breadcrumb_key][index], breadcrumb[1:], output[breadcrumb_key][0])
# only prepare output if the list field contains data at that index position in record
if record.get(breadcrumb_key) and len(record.get(breadcrumb_key)) > index:
self._apply_projection(record[breadcrumb_key][index], breadcrumb[1:], output[breadcrumb_key][0])
else:
if output.get(breadcrumb[0]) is None:
output[breadcrumb[0]] = {}
self._apply_projection(record[breadcrumb[0]], breadcrumb[1:], output[breadcrumb[0]])
# keep empty dict if the data is not found in the record
if record.get(breadcrumb[0]):
self._apply_projection(record.get(breadcrumb[0], {}), breadcrumb[1:], output[breadcrumb[0]])

def apply_projection(self, record, projections):
output = {}
Expand Down
104 changes: 104 additions & 0 deletions tests/test_dynamodb_log_based_parent_child_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from boto3.dynamodb.types import TypeSerializer

from tap_tester import connections
from tap_tester import menagerie
from tap_tester import runner

from base import TestDynamoDBBase

class DynamoDBLogBasedParentChildData(TestDynamoDBBase):
"""
Test case for verifying:
- The tap does not break when the parent data is not found and the user is requesting for child data
- The tap does not break when the data a specific position is not found in the record
"""

# expected table configs
def expected_table_config(self):
return [
{
'TableName': 'simple_table_1',
'HashKey': 'int_id',
'HashType': 'N',
'generator': self.generate_items,
'num_rows': 10,
'ProjectionExpression': 'int_id, map_field.map_entry_1, test_list_1[0], test_list_2[0], test_list_2[1], test_list_3[0].test_field',
'top_level_keys': {'int_id', 'map_field'},
'nested_map_keys': {'map_field': {'map_entry_1'}},
}
]

# send desired data for testing
def generate_items(self, num_items, start_key=0):
serializer = TypeSerializer()
for i in range(start_key, start_key + num_items):
record = {
'int_id': i,
'string_field': self.random_string_generator(),
'test_list_2': ['list_2_data']
}
yield serializer.serialize(record)

@staticmethod
def name():
return "tap_tester_dynamodb_parent_child_data"

def test_run(self):
(table_configs, conn_id, expected_streams) = self.pre_sync_test()

# Select 'simple_table_1' stream and add replication method and projection
found_catalogs = menagerie.get_catalogs(conn_id)
for stream_catalog in found_catalogs:
annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id'])
additional_md = [
{
"breadcrumb": [],
"metadata": {
'replication-method': 'LOG_BASED',
'tap-mongodb.projection': table_configs[0]['ProjectionExpression']
}
}
]
connections.select_catalog_and_fields_via_metadata(conn_id,
stream_catalog,
annotated_schema,
additional_md)

# diable stream to force shard to close
self.disableStreams(expected_streams)
# run sync mode 1st time as for the 1st time it sync in FULL_TABLE mode
sync_job_name = runner.run_sync_mode(self, conn_id)

exit_status = menagerie.get_exit_status(conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

# collect state file
state = menagerie.get_state(conn_id)
state_version = menagerie.get_state_version(conn_id)

# delete 'finished_shards' for every streams from the state file as we want to run 2nd sync
for config in table_configs:
table_name = config['TableName']
del state['bookmarks'][table_name]['finished_shards']
menagerie.set_state(conn_id, state, version=state_version)

# run the sync mode 2nd time, noow it will run in LOG_BASED mode
sync_job_name = runner.run_sync_mode(self, conn_id)

# get data
messages_by_stream = runner.get_records_from_target_output()

for stream in expected_streams:
messages = messages_by_stream.get(stream).get('messages')
records = [message.get('data') for message in messages if message.get('action') == 'upsert']
# verify that we replicated records
self.assertTrue(len(records) > 0)
for record in records:

# verify that we get 'None' for child data when parent data is not found
self.assertIsNone(record.get('map_field').get('map_entry_1'))
# verify that we only get the available data if the data at a particular index is not found
self.assertEquals(record.get('test_list_1'), [])
self.assertEquals(record.get('test_list_2'), ['list_2_data'])
# verify that we got empty map if the parent data at a particular index is not found for child data
self.assertEquals(record.get('test_list_3'), [{}])
130 changes: 130 additions & 0 deletions tests/unittests/test_deserializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import unittest
from tap_dynamodb import deserialize

class TestDeserializer(unittest.TestCase):

def test_projection_expression_all_list_data_not_found(self):
'''
Verify that we get empty list if the data is not found in the record
Example Projection: Artist, metadata[0]
Stream Record: {'Artist': 'No One You Know'}
'''
mock_record = {'Artist': 'No One You Know5'}
mock_projections = [['Artist'], ['metadata[0]']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we get empty list if the data is not found in the record
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': []})

def test_projection_expression_all_list_data_not_found_positive(self):
'''
Verify that we get empty list if the data is not found in the record
Example Projection: Artist, metadata[0]
Stream Record: {'Artist': 'No One You Know5', 'metadata': ['test']}
'''
mock_record = {'Artist': 'No One You Know5', 'metadata': ['test']}
mock_projections = [['Artist'], ['metadata[0]']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we get empty list if the data is not found in the record
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': ['test']})

def test_projection_expression_some_list_data_not_found(self):
'''
Verify that we only get the available data in the list when user expect the data that is not available
Example Projection: Artist, metadata[0], metadata[1]
Stream Record: {'Artist': 'No One You Know','metadata': ['test1']}
'''
mock_record = {'Artist': 'No One You Know5','metadata': ['test1']}
mock_projections = [['Artist'], ['metadata[0]'], ['metadata[1]']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we only get the available data in the list when user expect the data that is not available
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': ['test1']})

def test_projection_expression_some_list_data_not_found_positive(self):
'''
Verify that we only get the available data in the list when user expect the data that is not available
Example Projection: Artist, metadata[0], metadata[1]
Stream Record: {'Artist': 'No One You Know','metadata': ['test1', 'test2']}
'''
mock_record = {'Artist': 'No One You Know5','metadata': ['test1', 'test2']}
mock_projections = [['Artist'], ['metadata[0]'], ['metadata[1]']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we only get the available data in the list when user expect the data that is not available
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': ['test1', 'test2']})

def test_projection_expression_parent_child_data_list(self):
'''
Verify that we get empty dict when the element in the list is parent element and it is not found
Example Projection: Artist, metadata[0].Age
Stream Record: {'Artist': 'No One You Know'}
'''
mock_record = {'Artist': 'No One You Know5'}
mock_projections = [['Artist'], ['metadata[0]', 'Age']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we get empty dict when the element in the list is parent element and it is not found
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': [{}]})

def test_projection_expression_parent_child_data_list_positive(self):
'''
Verify that we get empty dict when the element in the list is parent element and it is not found
Example Projection: Artist, metadata[0].Age
Stream Record: {'Artist': 'No One You Know5', 'metadata': [{'Age': 'Test'}]}
'''
mock_record = {'Artist': 'No One You Know5', 'metadata': [{'Age': 'Test'}]}
mock_projections = [['Artist'], ['metadata[0]', 'Age']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# verify that we get empty dict when the element in the list is parent element and it is not found
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': [{'Age': 'Test'}]})

def test_projection_expression_parent_child_data_dictionary(self):
'''
Veriy that we get None when the parent data is not found and we are requesting for child data
Example Projection: Artist, metadata.inner_metadata
Stream Record: {'Artist': 'No One You Know'}
'''
mock_record = {'Artist': 'No One You Know5'}
mock_projections = [['Artist'], ['metadata', 'inner_metadata']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# veriy that we get None when the parent data is not found
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': {}})

def test_projection_expression_parent_child_data_dictionary_positive(self):
'''
Veriy that we get None when the parent data is not found and we are requesting for child data
Example Projection: Artist, metadata.inner_metadata
Stream Record: {'Artist': 'No One You Know5', 'metadata': {'inner_metadata': 'Test'}}
'''
mock_record = {'Artist': 'No One You Know5', 'metadata': {'inner_metadata': 'Test'}}
mock_projections = [['Artist'], ['metadata', 'inner_metadata']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# veriy that we get None when the parent data is not found
self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': {'inner_metadata': 'Test'}})

def test_projection_expression_parent_child_data_list_different_order(self):
'''
Veriy that we get no error when add list projection in decreasing order and it is not found
Example Projection: metadata[1], metadata[0].inner_metadata
Stream Record: {'metadata': [{'inner_metadata': 'Test'}]}
'''
mock_record = {'metadata': [{'inner_metadata': 'Test'}]}
mock_projections = [['metadata[1]'], ['metadata[0]', 'inner_metadata']]

deserializer = deserialize.Deserializer()
output = deserializer.apply_projection(mock_record, mock_projections)
# veriy that we get no error when add list projection in decreasing order and it is not found
self.assertEquals(output, {'metadata': [{'inner_metadata': 'Test'}]})

0 comments on commit 3d49bbf

Please sign in to comment.