Skip to content

Commit

Permalink
feat: FE document update with v2/document search service endpoint use (
Browse files Browse the repository at this point in the history
…amundsen-io#1619)

* Functional document update

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* updated unit tests

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* test fix 2

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* unit test fixing again, having trouble running lo caly

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* had the wrong route

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* delete

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* rewrite cleaner

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>

* lint

Signed-off-by: Allison Suarez Miranda <asuarezmiranda@lyft.com>
Signed-off-by: Amom Mendes <amommendes@hotmail.com>
  • Loading branch information
allisonsuarez authored and amommendes committed Jan 21, 2022
1 parent ea93775 commit bbfc1a4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 93 deletions.
98 changes: 25 additions & 73 deletions frontend/amundsen_application/api/metadata/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
from flask.blueprints import Blueprint

from amundsen_common.entity.resource_type import ResourceType, to_label
from amundsen_common.models.search import UpdateDocumentRequestSchema, UpdateDocumentRequest

from amundsen_application.api.utils.search_utils import generate_query_json
from amundsen_application.log.action_log import action_logging

from amundsen_application.models.user import load_user, dump_user

from amundsen_application.api.utils.metadata_utils import is_table_editable, marshall_table_partial, \
marshall_table_full, marshall_dashboard_partial, marshall_dashboard_full, marshall_feature_full, \
marshall_lineage_table, TableUri
from amundsen_application.api.utils.request_utils import get_query_param, request_metadata, request_search
from amundsen_application.api.utils.request_utils import get_query_param, request_metadata

from amundsen_application.api.utils.search_utils import execute_search_document_request


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -440,73 +442,6 @@ def _update_metadata_tag(table_key: str, method: str, tag: str) -> int:
return status_code


def _update_search_tag(key: str, resource_type: ResourceType, method: str, tag: str) -> int:
"""
call the search service endpoint to get whole entity information uniquely identified by the key
update tags list, call search service endpoint again to write back the updated field
TODO: we should update dashboard tag in the future. Note that dashboard ES schema doesn't have key field,
so that should be added.
:param key: e.g. 'database://cluster.schema/table'
:param method: PUT or DELETE
:param tag: tag name to be added/deleted
:return: HTTP status code
"""
searchservice_base = app.config['SEARCHSERVICE_BASE']

if resource_type == ResourceType.Table:
filter_url = f'{searchservice_base}/search_table'
update_url = f'{searchservice_base}/document_table'
elif resource_type == ResourceType.Feature:
filter_url = f'{searchservice_base}/search_feature_filter'
update_url = f'{searchservice_base}/document_feature'
else:
LOGGER.error(f'updating search tags not supported for {resource_type.name.lower()}')
return HTTPStatus.NOT_IMPLEMENTED

query = generate_query_json(filters={'key': [key]}, page_index=0, search_term='')
search_response = request_search(
url=filter_url,
method='POST',
headers={'Content-Type': 'application/json'},
data=json.dumps(query),
)

if search_response.status_code != HTTPStatus.OK:
LOGGER.info(f'Fail to get entity from serviceservice, http status code: {search_response.status_code}')
LOGGER.info(search_response.text)
return search_response.status_code

raw_data_map = json.loads(search_response.text)
# key should uniquely identify this resource
num_results = len(raw_data_map['results'])
if num_results != 1:
LOGGER.error(f'Expecting exactly one ES result for key {key} but got {num_results}')
return HTTPStatus.INTERNAL_SERVER_ERROR

resource = raw_data_map['results'][0]
old_tags_list = resource['tags']
new_tags_list = [item for item in old_tags_list if item['tag_name'] != tag]
if method != 'DELETE':
new_tags_list.append({'tag_name': tag})
resource['tags'] = new_tags_list

# remove None values
pruned_entity = {k: v for k, v in resource.items() if v is not None}
post_param_map = {"data": pruned_entity}
update_response = request_search(
url=update_url,
method='PUT',
headers={'Content-Type': 'application/json'},
data=json.dumps(post_param_map),
)
if update_response.status_code != HTTPStatus.OK:
LOGGER.info(f'Fail to update tag in searchservice, http status code: {update_response.status_code}')
LOGGER.info(update_response.text)
return update_response.status_code

return HTTPStatus.OK


@metadata_blueprint.route('/update_table_tags', methods=['PUT', 'DELETE'])
def update_table_tags() -> Response:

Expand All @@ -525,8 +460,17 @@ def _log_update_table_tags(*, table_key: str, method: str, tag: str) -> None:
_log_update_table_tags(table_key=table_key, method=method, tag=tag)

metadata_status_code = _update_metadata_tag(table_key=table_key, method=method, tag=tag)
search_status_code = _update_search_tag(
key=table_key, resource_type=ResourceType.Table, method=method, tag=tag)

search_method = method if method == 'DELETE' else 'POST'
update_request = UpdateDocumentRequest(resource_key=table_key,
resource_type=ResourceType.Table.name.lower(),
field='tag',
value=tag,
operation='add')
request_json = json.dumps(UpdateDocumentRequestSchema().dump(update_request))

search_status_code = execute_search_document_request(request_json=request_json,
method=search_method)

http_status_code = HTTPStatus.OK
if metadata_status_code == HTTPStatus.OK and search_status_code == HTTPStatus.OK:
Expand Down Expand Up @@ -1048,9 +992,17 @@ def update_feature_tags() -> Response:
metadata_status_code = _update_metadata_feature_tag(endpoint=endpoint,
feature_key=feature_key,
method=method, tag=tag)
search_status_code = _update_search_tag(
key=feature_key, resource_type=ResourceType.Feature, method=method, tag=tag)

search_method = method if method == 'DELETE' else 'POST'
update_request = UpdateDocumentRequest(resource_key=feature_key,
resource_type=ResourceType.Feature.name.lower(),
field='tags',
value=tag,
operation='add')
request_json = json.dumps(UpdateDocumentRequestSchema().dump(update_request))

search_status_code = execute_search_document_request(request_json=request_json,
method=search_method)
http_status_code = HTTPStatus.OK
if metadata_status_code == HTTPStatus.OK and search_status_code == HTTPStatus.OK:
message = 'Success'
Expand Down
4 changes: 2 additions & 2 deletions frontend/amundsen_application/api/utils/request_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, da

if client is not None:
if method == 'DELETE':
return client.delete(url, headers=headers, raw_response=True)
return client.delete(url, headers=headers, raw_response=True, data=data, json=json)
elif method == 'GET':
return client.get(url, headers=headers, raw_response=True)
elif method == 'POST':
Expand All @@ -111,7 +111,7 @@ def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, da
else:
with build_session() as s:
if method == 'DELETE':
return s.delete(url, headers=headers, timeout=timeout_sec)
return s.delete(url, headers=headers, timeout=timeout_sec, data=data, json=json)
elif method == 'GET':
return s.get(url, headers=headers, timeout=timeout_sec)
elif method == 'POST':
Expand Down
24 changes: 24 additions & 0 deletions frontend/amundsen_application/api/utils/search_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import logging
from typing import Dict, List # noqa: F401

from http import HTTPStatus

from flask import current_app as app

from amundsen_application.api.utils.request_utils import request_search

from amundsen_common.models.search import Filter, SearchRequest

from amundsen_application.models.user import dump_user, load_user
Expand Down Expand Up @@ -104,6 +111,23 @@ def generate_query_json(*, filters: Dict = {}, page_index: int, search_term: str
}


def execute_search_document_request(request_json: str, method: str) -> int:
search_service_base = app.config['SEARCHSERVICE_BASE']
search_document_url = f'{search_service_base}/v2/document'
update_response = request_search(
url=search_document_url,
method=method,
headers={'Content-Type': 'application/json'},
data=request_json,
)
status_code = update_response.status_code
if status_code != HTTPStatus.OK:
LOGGER.info(f'Failed to execute {method} for {request_json} in searchservice, status code: {status_code}')
LOGGER.info(update_response.text)

return status_code


def generate_query_request(*, filters: List[Filter] = [],
resources: List[str] = [],
page_index: int = 0,
Expand Down
24 changes: 6 additions & 18 deletions frontend/tests/unit/api/metadata/test_v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,13 +861,9 @@ def test_update_table_tags_put(self) -> None:
responses.add(responses.PUT, url, json={}, status=HTTPStatus.OK)

searchservice_base = local_app.config['SEARCHSERVICE_BASE']
get_table_url = f'{searchservice_base}/search_table'
responses.add(responses.POST, get_table_url,
json={'results': [{'id': '1', 'tags': [{'tag_name': 'tag_1'}, {'tag_name': 'tag_2'}]}]},
status=HTTPStatus.OK)

post_table_url = f'{searchservice_base}/document_table'
responses.add(responses.PUT, post_table_url, json={}, status=HTTPStatus.OK)
post_table_url = f'{searchservice_base}/v2/document'
responses.add(responses.POST, post_table_url, json={}, status=HTTPStatus.OK)

with local_app.test_client() as test:
response = test.put(
Expand All @@ -889,13 +885,9 @@ def test_update_table_tags_delete(self) -> None:
responses.add(responses.DELETE, url, json={}, status=HTTPStatus.OK)

searchservice_base = local_app.config['SEARCHSERVICE_BASE']
get_table_url = f'{searchservice_base}/search_table'
responses.add(responses.POST, get_table_url,
json={'results': [{'id': '1', 'tags': [{'tag_name': 'tag_1'}, {'tag_name': 'tag_2'}]}]},
status=HTTPStatus.OK)

post_table_url = f'{searchservice_base}/document_table'
responses.add(responses.PUT, post_table_url, json={}, status=HTTPStatus.OK)
post_table_url = f'{searchservice_base}/v2/document'
responses.add(responses.DELETE, post_table_url, json={}, status=HTTPStatus.OK)

with local_app.test_client() as test:
response = test.delete(
Expand Down Expand Up @@ -1321,13 +1313,9 @@ def test_update_feature_tags(self) -> None:
responses.add(responses.PUT, url, json={}, status=HTTPStatus.OK)

searchservice_base = local_app.config['SEARCHSERVICE_BASE']
search_url = f'{searchservice_base}/search_feature_filter'
responses.add(responses.POST, search_url,
json={'results': [{'id': '1', 'tags': [{'tag_name': 'tag_1'}, {'tag_name': 'tag_2'}]}]},
status=HTTPStatus.OK)

search_update_url = f'{searchservice_base}/document_feature'
responses.add(responses.PUT, search_update_url, json={}, status=HTTPStatus.OK)
search_update_url = f'{searchservice_base}/v2/document'
responses.add(responses.POST, search_update_url, json={}, status=HTTPStatus.OK)

with local_app.test_client() as test:
response = test.put(
Expand Down

0 comments on commit bbfc1a4

Please sign in to comment.