Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Neo4jProxy table owners query for easier customization #2182

Merged
merged 7 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions metadata/metadata_service/proxy/neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ def get_table(self, *, table_uri: str) -> Table:
cols, last_neo4j_record = self._exec_col_query(table_uri)

readers = self._exec_usage_query(table_uri)
owners = self._exec_owners_query(table_uri)

wmk_results, table_writer, table_apps, timestamp_value, owners, tags, source, \
wmk_results, table_writer, table_apps, timestamp_value, tags, source, \
badges, prog_descs, resource_reports = self._exec_table_query(table_uri)

joins, filters = self._exec_table_query_query(table_uri)
Expand Down Expand Up @@ -340,22 +341,41 @@ def _exec_usage_query(self, table_uri: str) -> List[Reader]:

return readers

@timer_with_counter
def _exec_owners_query(self, table_uri: str) -> List[User]:
# Return Value: List[User]
owners_query = textwrap.dedent("""
MATCH (owner:User)<-[:OWNER]-(tbl:Table {key: $tbl_key})
RETURN collect(distinct owner) as owner_records
""")
owners_neo4j_records = self._execute_cypher_query(statement=owners_query,
param_dict={'tbl_key': table_uri})

owners_neo4j_records = get_single_record(owners_neo4j_records)

owners = [] # type: List[User]
for owner_neo4j_record in owners_neo4j_records.get('owner_records', []):
owner_data = self._get_user_details(user_id=owner_neo4j_record['email'])
owner = self._build_user_from_record(record=owner_data)
owners.append(owner)

return owners

@timer_with_counter
def _exec_table_query(self, table_uri: str) -> Tuple:
"""
Queries one Cypher record with watermark list, Application,
,timestamp, owner records and tag records.
,timestamp, and tag records.
"""

# Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, owner records, tag records)
# Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, tag records)

table_level_query = textwrap.dedent("""\
MATCH (tbl:Table {key: $tbl_key})
OPTIONAL MATCH (wmk:Watermark)-[:BELONG_TO_TABLE]->(tbl)
OPTIONAL MATCH (app_producer:Application)-[:GENERATES]->(tbl)
OPTIONAL MATCH (app_consumer:Application)-[:CONSUMES]->(tbl)
OPTIONAL MATCH (tbl)-[:LAST_UPDATED_AT]->(t:Timestamp)
OPTIONAL MATCH (owner:User)<-[:OWNER]-(tbl)
OPTIONAL MATCH (tbl)-[:TAGGED_BY]->(tag:Tag{tag_type: $tag_normal_type})
OPTIONAL MATCH (tbl)-[:HAS_BADGE]->(badge:Badge)
OPTIONAL MATCH (tbl)-[:SOURCE]->(src:Source)
Expand All @@ -365,7 +385,6 @@ def _exec_table_query(self, table_uri: str) -> Tuple:
collect(distinct app_producer) as producing_apps,
collect(distinct app_consumer) as consuming_apps,
t.last_updated_timestamp as last_updated_timestamp,
collect(distinct owner) as owner_records,
collect(distinct tag) as tag_records,
collect(distinct badge) as badge_records,
src,
Expand Down Expand Up @@ -405,12 +424,6 @@ def _exec_table_query(self, table_uri: str) -> Tuple:

timestamp_value = table_records['last_updated_timestamp']

owner_record = []

for owner in table_records.get('owner_records', []):
owner_data = self._get_user_details(user_id=owner['email'])
owner_record.append(self._build_user_from_record(record=owner_data))

src = None

if table_records['src']:
Expand All @@ -423,7 +436,7 @@ def _exec_table_query(self, table_uri: str) -> Tuple:

resource_reports = self._extract_resource_reports_from_query(table_records.get('resource_reports', []))

return wmk_results, table_writer, table_apps, timestamp_value, owner_record,\
return wmk_results, table_writer, table_apps, timestamp_value,\
tags, src, badges, prog_descriptions, resource_reports

@timer_with_counter
Expand All @@ -434,7 +447,7 @@ def _exec_table_query_query(self, table_uri: str) -> Tuple:
on the table.
"""

# Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, owner records, tag records)
# Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, tag records)
table_query_level_query = textwrap.dedent("""
MATCH (tbl:Table {key: $tbl_key})
OPTIONAL MATCH (tbl)-[:COLUMN]->(col:Column)-[COLUMN_JOINS_WITH]->(j:Join)
Expand Down
20 changes: 13 additions & 7 deletions metadata/tests/unit/proxy/test_neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,6 @@ def setUp(self) -> None:
}
],
'last_updated_timestamp': 1,
'owner_records': [
{
'key': 'tester@example.com',
'email': 'tester@example.com',
'updated_at': 0,
}
],
'tag_records': [
{
'key': 'test',
Expand Down Expand Up @@ -236,6 +229,14 @@ def setUp(self) -> None:
]
}]

owners_results = [{'owner_records': [
{
'key': 'tester@example.com',
'email': 'tester@example.com',
'updated_at': 0,
}
], }]

last_updated_timestamp = '01'

self.col_usage_return_value = [
Expand All @@ -250,6 +251,8 @@ def setUp(self) -> None:

self.table_common_usage = table_common_usage

self.owners_return_value = owners_results

self.col_bar_id_1_expected_type_metadata = self._get_col_bar_id_1_expected_type_metadata()
self.col_bar_id_2_expected_type_metadata = self._get_col_bar_id_2_expected_type_metadata()

Expand Down Expand Up @@ -355,9 +358,11 @@ def test_health_neo4j(self) -> None:

def test_get_table(self) -> None:
with patch.object(GraphDatabase, 'driver'), patch.object(Neo4jProxy, '_execute_cypher_query') as mock_execute:
# mock database return values such that we match ordering of queries executed in Neo4jProxy.get_table
mock_execute.side_effect = [
self.col_usage_return_value,
[],
self.owners_return_value,
self.table_level_return_value,
self.table_common_usage,
[]
Expand Down Expand Up @@ -445,6 +450,7 @@ def test_get_table_view_only(self) -> None:
mock_execute.side_effect = [
col_usage_return_value,
[],
self.owners_return_value,
self.table_level_return_value,
self.table_common_usage,
[]
Expand Down
Loading