Skip to content

Commit

Permalink
fix: Too many partitions in one Atlas query (Watermarks in Atlas Prox…
Browse files Browse the repository at this point in the history
…y) (#217)

* watermark_atlas_proxy | ♻️ Refactoring code.

Signed-off-by: mgorsk1 <gorskimariusz13@gmail.com>

* watermark_atlas_proxy | 🚨 Removing linter warnings.

Signed-off-by: mgorsk1 <gorskimariusz13@gmail.com>
  • Loading branch information
mgorsk1 authored Nov 2, 2020
1 parent 3c9a55e commit cc3768f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 52 deletions.
20 changes: 6 additions & 14 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,35 +522,27 @@ def _get_table_watermarks(self, entity: EntityUniqueAttribute) -> List[Watermark

_partitions = entity.get('relationshipAttributes', dict()).get('partitions', list())

guids = [_partition.get('guid') for _partition in _partitions
names = [_partition.get('displayText') for _partition in _partitions
if _partition.get('entityStatus') == Status.ACTIVE
and _partition.get('relationshipStatus') == Status.ACTIVE]

if not guids:
if not names:
return []

partition_key = AtlasProxy._render_partition_key_name(entity)

full_partitions = extract_entities(self._driver.entity_bulk(guid=list(guids), ignoreRelationships=True))
watermark_date_format = AtlasProxy._select_watermark_format([p.attributes.get('name') for p in full_partitions])
watermark_date_format = AtlasProxy._select_watermark_format(names)

partitions = {}

for partition in full_partitions:
partition_name = partition.attributes.get('name')

for _partition in _partitions:
partition_name = _partition.get('displayText')
if partition_name and watermark_date_format:
partition_date, _ = AtlasProxy._validate_date(partition_name, watermark_date_format)

if partition_date:
_partition_create_time = self._parse_date(partition.createTime) or 0.0

partition_create_time = datetime.datetime.fromtimestamp(
_partition_create_time).strftime(partition_value_format)

common_values = {'partition_value': datetime.datetime.strftime(partition_date,
partition_value_format),
'create_time': partition_create_time,
'create_time': 0,
'partition_key': partition_key}

partitions[partition_date] = common_values
Expand Down
32 changes: 5 additions & 27 deletions tests/unit/proxy/fixtures/atlas_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0

import copy
from typing import List, Dict


class DottedDict(dict):
Expand Down Expand Up @@ -130,7 +131,7 @@ class Data:
'createTime': 1599723564000
}

partitions = [partition_entity_1, partition_entity_2, partition_entity_3, partition_entity_4]
partitions: List[Dict] = [partition_entity_1, partition_entity_2, partition_entity_3, partition_entity_4]

entity1 = {
'guid': '1',
Expand Down Expand Up @@ -169,32 +170,9 @@ class Data:
"displayText": "deleted_owned_by"
}
],
'partitions': [
{
"entityStatus": "INACTIVE",
"relationshipStatus": "ACTIVE",
"guid": "000",
"displayText": "active_partition"
},
{
"entityStatus": "ACTIVE",
"relationshipStatus": "ACTIVE",
"guid": "111",
"displayText": "active_partition"
},
{
"entityStatus": "ACTIVE",
"relationshipStatus": "ACTIVE",
"guid": "222",
"displayText": "active_partition"
},
{
"entityStatus": "ACTIVE",
"relationshipStatus": "ACTIVE",
"guid": "333",
"displayText": "active_partition"
}
]
'partitions': [dict(displayText=p.get('attributes', dict()).get('name'),
entityStatus=p.get('status'),
relationshipStatus='ACTIVE') for p in partitions]
},
}
entity1.update(classification_entity)
Expand Down
14 changes: 3 additions & 11 deletions tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TestAtlasProxy(unittest.TestCase, Data):
def setUp(self) -> None:
self.app = create_app(config_module_class='metadata_service.config.LocalConfig')
self.app.config['PROGRAMMATIC_DESCRIPTIONS_EXCLUDE_FILTERS'] = ['spark.*']
self.app.config['WATERMARK_DATE_FORMATS'] = ''
self.app_context = self.app.app_context()
self.app_context.push()

Expand Down Expand Up @@ -136,6 +137,7 @@ def _get_table(self, custom_stats_format: bool = False) -> None:
col_type='Managed',
sort_order=col_attrs['position'],
stats=exp_col_stats)

expected = Table(database=self.entity_type,
cluster=self.cluster,
schema=self.db,
Expand All @@ -147,6 +149,7 @@ def _get_table(self, custom_stats_format: bool = False) -> None:
ResourceReport(name='test_report3', url='http://test3')],
last_updated_timestamp=int(str(self.entity1['updateTime'])[:10]),
columns=[exp_col] * self.active_columns,
watermarks=[],
programmatic_descriptions=[ProgrammaticDescription(source='test parameter key a',
text='testParameterValueA'),
ProgrammaticDescription(source='test parameter key b',
Expand Down Expand Up @@ -531,20 +534,9 @@ def test_get_table_watermarks(self) -> None:
(['%Y-%m-%d'], 0, None),
([], 0, None)]

mocked_partition_entities_collection = MagicMock()
mocked_partition_entities_collection.entities = []

for entity in self.partitions:
mocked_report_entity = MagicMock()
mocked_report_entity.status = entity['status']
mocked_report_entity.attributes = entity['attributes']
mocked_report_entity.createTime = entity['createTime']
mocked_partition_entities_collection.entities.append(mocked_report_entity)

for supported_formats, expected_result_length, low_date_prefix in params:
with self.subTest():
self.app.config['WATERMARK_DATE_FORMATS'] = supported_formats
self.proxy._driver.entity_bulk = MagicMock(return_value=[mocked_partition_entities_collection])

result = self.proxy._get_table_watermarks(cast(dict, self.entity1))

Expand Down

0 comments on commit cc3768f

Please sign in to comment.