diff --git a/tap_dynamodb/deserialize.py b/tap_dynamodb/deserialize.py index 9cedd03..2f0b44b 100644 --- a/tap_dynamodb/deserialize.py +++ b/tap_dynamodb/deserialize.py @@ -63,23 +63,31 @@ def _apply_projection(self, record, breadcrumb, output): breadcrumb_key = breadcrumb[0].split('[')[0] index = int(breadcrumb[0].split('[')[1].split(']')[0]) if output.get(breadcrumb_key): - output[breadcrumb_key].append(record[breadcrumb_key][index]) + # only prepare output if the list field contains data at that index position in record + if len(record.get(breadcrumb_key)) > index: + output[breadcrumb_key].append(record[breadcrumb_key][index]) else: - output[breadcrumb_key] = [record[breadcrumb_key][index]] - + output[breadcrumb_key] = [] + # only prepare output if the list field contains data at that index position in record + if record.get(breadcrumb_key) and len(record.get(breadcrumb_key)) > index: + output[breadcrumb_key].append(record[breadcrumb_key][index]) else: output[breadcrumb[0]] = record.get(breadcrumb[0]) else: if '[' in breadcrumb[0]: breadcrumb_key = breadcrumb[0].split('[')[0] index = int(breadcrumb[0].split('[')[1].split(']')[0]) - if output.get(breadcrumb_key) is None: + if not output.get(breadcrumb_key): output[breadcrumb_key] = [{}] - self._apply_projection(record[breadcrumb_key][index], breadcrumb[1:], output[breadcrumb_key][0]) + # only prepare output if the list field contains data at that index position in record + if record.get(breadcrumb_key) and len(record.get(breadcrumb_key)) > index: + self._apply_projection(record[breadcrumb_key][index], breadcrumb[1:], output[breadcrumb_key][0]) else: if output.get(breadcrumb[0]) is None: output[breadcrumb[0]] = {} - self._apply_projection(record[breadcrumb[0]], breadcrumb[1:], output[breadcrumb[0]]) + # keep empty dict if the data is not found in the record + if record.get(breadcrumb[0]): + self._apply_projection(record.get(breadcrumb[0], {}), breadcrumb[1:], output[breadcrumb[0]]) def apply_projection(self, record, projections): output = {} diff --git a/tests/test_dynamodb_log_based_parent_child_data.py b/tests/test_dynamodb_log_based_parent_child_data.py new file mode 100644 index 0000000..13cc275 --- /dev/null +++ b/tests/test_dynamodb_log_based_parent_child_data.py @@ -0,0 +1,104 @@ +from boto3.dynamodb.types import TypeSerializer + +from tap_tester import connections +from tap_tester import menagerie +from tap_tester import runner + +from base import TestDynamoDBBase + +class DynamoDBLogBasedParentChildData(TestDynamoDBBase): + """ + Test case for verifying: + - The tap does not break when the parent data is not found and the user is requesting for child data + - The tap does not break when the data a specific position is not found in the record + """ + + # expected table configs + def expected_table_config(self): + return [ + { + 'TableName': 'simple_table_1', + 'HashKey': 'int_id', + 'HashType': 'N', + 'generator': self.generate_items, + 'num_rows': 10, + 'ProjectionExpression': 'int_id, map_field.map_entry_1, test_list_1[0], test_list_2[0], test_list_2[1], test_list_3[0].test_field', + 'top_level_keys': {'int_id', 'map_field'}, + 'nested_map_keys': {'map_field': {'map_entry_1'}}, + } + ] + + # send desired data for testing + def generate_items(self, num_items, start_key=0): + serializer = TypeSerializer() + for i in range(start_key, start_key + num_items): + record = { + 'int_id': i, + 'string_field': self.random_string_generator(), + 'test_list_2': ['list_2_data'] + } + yield serializer.serialize(record) + + @staticmethod + def name(): + return "tap_tester_dynamodb_parent_child_data" + + def test_run(self): + (table_configs, conn_id, expected_streams) = self.pre_sync_test() + + # Select 'simple_table_1' stream and add replication method and projection + found_catalogs = menagerie.get_catalogs(conn_id) + for stream_catalog in found_catalogs: + annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id']) + additional_md = [ + { + "breadcrumb": [], + "metadata": { + 'replication-method': 'LOG_BASED', + 'tap-mongodb.projection': table_configs[0]['ProjectionExpression'] + } + } + ] + connections.select_catalog_and_fields_via_metadata(conn_id, + stream_catalog, + annotated_schema, + additional_md) + + # diable stream to force shard to close + self.disableStreams(expected_streams) + # run sync mode 1st time as for the 1st time it sync in FULL_TABLE mode + sync_job_name = runner.run_sync_mode(self, conn_id) + + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # collect state file + state = menagerie.get_state(conn_id) + state_version = menagerie.get_state_version(conn_id) + + # delete 'finished_shards' for every streams from the state file as we want to run 2nd sync + for config in table_configs: + table_name = config['TableName'] + del state['bookmarks'][table_name]['finished_shards'] + menagerie.set_state(conn_id, state, version=state_version) + + # run the sync mode 2nd time, noow it will run in LOG_BASED mode + sync_job_name = runner.run_sync_mode(self, conn_id) + + # get data + messages_by_stream = runner.get_records_from_target_output() + + for stream in expected_streams: + messages = messages_by_stream.get(stream).get('messages') + records = [message.get('data') for message in messages if message.get('action') == 'upsert'] + # verify that we replicated records + self.assertTrue(len(records) > 0) + for record in records: + + # verify that we get 'None' for child data when parent data is not found + self.assertIsNone(record.get('map_field').get('map_entry_1')) + # verify that we only get the available data if the data at a particular index is not found + self.assertEquals(record.get('test_list_1'), []) + self.assertEquals(record.get('test_list_2'), ['list_2_data']) + # verify that we got empty map if the parent data at a particular index is not found for child data + self.assertEquals(record.get('test_list_3'), [{}]) diff --git a/tests/unittests/test_deserializer.py b/tests/unittests/test_deserializer.py new file mode 100644 index 0000000..2f8bfb0 --- /dev/null +++ b/tests/unittests/test_deserializer.py @@ -0,0 +1,130 @@ +import unittest +from tap_dynamodb import deserialize + +class TestDeserializer(unittest.TestCase): + + def test_projection_expression_all_list_data_not_found(self): + ''' + Verify that we get empty list if the data is not found in the record + Example Projection: Artist, metadata[0] + Stream Record: {'Artist': 'No One You Know'} + ''' + mock_record = {'Artist': 'No One You Know5'} + mock_projections = [['Artist'], ['metadata[0]']] + + deserializer = deserialize.Deserializer() + output = deserializer.apply_projection(mock_record, mock_projections) + # verify that we get empty list if the data is not found in the record + self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': []}) + + def test_projection_expression_all_list_data_not_found_positive(self): + ''' + Verify that we get empty list if the data is not found in the record + Example Projection: Artist, metadata[0] + Stream Record: {'Artist': 'No One You Know5', 'metadata': ['test']} + ''' + mock_record = {'Artist': 'No One You Know5', 'metadata': ['test']} + mock_projections = [['Artist'], ['metadata[0]']] + + deserializer = deserialize.Deserializer() + output = deserializer.apply_projection(mock_record, mock_projections) + # verify that we get empty list if the data is not found in the record + self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': ['test']}) + + def test_projection_expression_some_list_data_not_found(self): + ''' + Verify that we only get the available data in the list when user expect the data that is not available + Example Projection: Artist, metadata[0], metadata[1] + Stream Record: {'Artist': 'No One You Know','metadata': ['test1']} + ''' + mock_record = {'Artist': 'No One You Know5','metadata': ['test1']} + mock_projections = [['Artist'], ['metadata[0]'], ['metadata[1]']] + + deserializer = deserialize.Deserializer() + output = deserializer.apply_projection(mock_record, mock_projections) + # verify that we only get the available data in the list when user expect the data that is not available + self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': ['test1']}) + + def test_projection_expression_some_list_data_not_found_positive(self): + ''' + Verify that we only get the available data in the list when user expect the data that is not available + Example Projection: Artist, metadata[0], metadata[1] + Stream Record: {'Artist': 'No One You Know','metadata': ['test1', 'test2']} + ''' + mock_record = {'Artist': 'No One You Know5','metadata': ['test1', 'test2']} + mock_projections = [['Artist'], ['metadata[0]'], ['metadata[1]']] + + deserializer = deserialize.Deserializer() + output = deserializer.apply_projection(mock_record, mock_projections) + # verify that we only get the available data in the list when user expect the data that is not available + self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': ['test1', 'test2']}) + + def test_projection_expression_parent_child_data_list(self): + ''' + Verify that we get empty dict when the element in the list is parent element and it is not found + Example Projection: Artist, metadata[0].Age + Stream Record: {'Artist': 'No One You Know'} + ''' + mock_record = {'Artist': 'No One You Know5'} + mock_projections = [['Artist'], ['metadata[0]', 'Age']] + + deserializer = deserialize.Deserializer() + output = deserializer.apply_projection(mock_record, mock_projections) + # verify that we get empty dict when the element in the list is parent element and it is not found + self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': [{}]}) + + def test_projection_expression_parent_child_data_list_positive(self): + ''' + Verify that we get empty dict when the element in the list is parent element and it is not found + Example Projection: Artist, metadata[0].Age + Stream Record: {'Artist': 'No One You Know5', 'metadata': [{'Age': 'Test'}]} + ''' + mock_record = {'Artist': 'No One You Know5', 'metadata': [{'Age': 'Test'}]} + mock_projections = [['Artist'], ['metadata[0]', 'Age']] + + deserializer = deserialize.Deserializer() + output = deserializer.apply_projection(mock_record, mock_projections) + # verify that we get empty dict when the element in the list is parent element and it is not found + self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': [{'Age': 'Test'}]}) + + def test_projection_expression_parent_child_data_dictionary(self): + ''' + Veriy that we get None when the parent data is not found and we are requesting for child data + Example Projection: Artist, metadata.inner_metadata + Stream Record: {'Artist': 'No One You Know'} + ''' + mock_record = {'Artist': 'No One You Know5'} + mock_projections = [['Artist'], ['metadata', 'inner_metadata']] + + deserializer = deserialize.Deserializer() + output = deserializer.apply_projection(mock_record, mock_projections) + # veriy that we get None when the parent data is not found + self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': {}}) + + def test_projection_expression_parent_child_data_dictionary_positive(self): + ''' + Veriy that we get None when the parent data is not found and we are requesting for child data + Example Projection: Artist, metadata.inner_metadata + Stream Record: {'Artist': 'No One You Know5', 'metadata': {'inner_metadata': 'Test'}} + ''' + mock_record = {'Artist': 'No One You Know5', 'metadata': {'inner_metadata': 'Test'}} + mock_projections = [['Artist'], ['metadata', 'inner_metadata']] + + deserializer = deserialize.Deserializer() + output = deserializer.apply_projection(mock_record, mock_projections) + # veriy that we get None when the parent data is not found + self.assertEquals(output, {'Artist': 'No One You Know5', 'metadata': {'inner_metadata': 'Test'}}) + + def test_projection_expression_parent_child_data_list_different_order(self): + ''' + Veriy that we get no error when add list projection in decreasing order and it is not found + Example Projection: metadata[1], metadata[0].inner_metadata + Stream Record: {'metadata': [{'inner_metadata': 'Test'}]} + ''' + mock_record = {'metadata': [{'inner_metadata': 'Test'}]} + mock_projections = [['metadata[1]'], ['metadata[0]', 'inner_metadata']] + + deserializer = deserialize.Deserializer() + output = deserializer.apply_projection(mock_record, mock_projections) + # veriy that we get no error when add list projection in decreasing order and it is not found + self.assertEquals(output, {'metadata': [{'inner_metadata': 'Test'}]})