Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: FE document update with v2/document search service endpoint use #1619

Merged
merged 10 commits into from
Dec 22, 2021
91 changes: 17 additions & 74 deletions frontend/amundsen_application/api/metadata/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@

from amundsen_common.entity.resource_type import ResourceType, to_label

from amundsen_application.api.utils.search_utils import generate_query_json
from amundsen_application.log.action_log import action_logging

from amundsen_application.models.user import load_user, dump_user

from amundsen_application.api.utils.metadata_utils import is_table_editable, marshall_table_partial, \
marshall_table_full, marshall_dashboard_partial, marshall_dashboard_full, marshall_feature_full, \
marshall_lineage_table, TableUri
from amundsen_application.api.utils.request_utils import get_query_param, request_metadata, request_search
from amundsen_application.api.utils.request_utils import get_query_param, request_metadata

from amundsen_application.api.utils.search_utils import update_search_field


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -440,73 +441,6 @@ def _update_metadata_tag(table_key: str, method: str, tag: str) -> int:
return status_code


def _update_search_tag(key: str, resource_type: ResourceType, method: str, tag: str) -> int:
"""
call the search service endpoint to get whole entity information uniquely identified by the key
update tags list, call search service endpoint again to write back the updated field
TODO: we should update dashboard tag in the future. Note that dashboard ES schema doesn't have key field,
so that should be added.
:param key: e.g. 'database://cluster.schema/table'
:param method: PUT or DELETE
:param tag: tag name to be added/deleted
:return: HTTP status code
"""
searchservice_base = app.config['SEARCHSERVICE_BASE']

if resource_type == ResourceType.Table:
filter_url = f'{searchservice_base}/search_table'
update_url = f'{searchservice_base}/document_table'
elif resource_type == ResourceType.Feature:
filter_url = f'{searchservice_base}/search_feature_filter'
update_url = f'{searchservice_base}/document_feature'
else:
LOGGER.error(f'updating search tags not supported for {resource_type.name.lower()}')
return HTTPStatus.NOT_IMPLEMENTED

query = generate_query_json(filters={'key': [key]}, page_index=0, search_term='')
search_response = request_search(
url=filter_url,
method='POST',
headers={'Content-Type': 'application/json'},
data=json.dumps(query),
)

if search_response.status_code != HTTPStatus.OK:
LOGGER.info(f'Fail to get entity from serviceservice, http status code: {search_response.status_code}')
LOGGER.info(search_response.text)
return search_response.status_code

raw_data_map = json.loads(search_response.text)
# key should uniquely identify this resource
num_results = len(raw_data_map['results'])
if num_results != 1:
LOGGER.error(f'Expecting exactly one ES result for key {key} but got {num_results}')
return HTTPStatus.INTERNAL_SERVER_ERROR

resource = raw_data_map['results'][0]
old_tags_list = resource['tags']
new_tags_list = [item for item in old_tags_list if item['tag_name'] != tag]
if method != 'DELETE':
new_tags_list.append({'tag_name': tag})
resource['tags'] = new_tags_list

# remove None values
pruned_entity = {k: v for k, v in resource.items() if v is not None}
post_param_map = {"data": pruned_entity}
update_response = request_search(
url=update_url,
method='PUT',
headers={'Content-Type': 'application/json'},
data=json.dumps(post_param_map),
)
if update_response.status_code != HTTPStatus.OK:
LOGGER.info(f'Fail to update tag in searchservice, http status code: {update_response.status_code}')
LOGGER.info(update_response.text)
return update_response.status_code

return HTTPStatus.OK


@metadata_blueprint.route('/update_table_tags', methods=['PUT', 'DELETE'])
def update_table_tags() -> Response:

Expand All @@ -525,8 +459,13 @@ def _log_update_table_tags(*, table_key: str, method: str, tag: str) -> None:
_log_update_table_tags(table_key=table_key, method=method, tag=tag)

metadata_status_code = _update_metadata_tag(table_key=table_key, method=method, tag=tag)
search_status_code = _update_search_tag(
key=table_key, resource_type=ResourceType.Table, method=method, tag=tag)
search_method = method if method == 'DELETE' else 'POST'
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
search_status_code = update_search_field(key=table_key,
resource_type=ResourceType.Table.name.lower(),
field='tag',
value=tag,
operation='add',
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
method=search_method)

http_status_code = HTTPStatus.OK
if metadata_status_code == HTTPStatus.OK and search_status_code == HTTPStatus.OK:
Expand Down Expand Up @@ -1048,9 +987,13 @@ def update_feature_tags() -> Response:
metadata_status_code = _update_metadata_feature_tag(endpoint=endpoint,
feature_key=feature_key,
method=method, tag=tag)
search_status_code = _update_search_tag(
key=feature_key, resource_type=ResourceType.Feature, method=method, tag=tag)

search_method = method if method == 'DELETE' else 'POST'
search_status_code = update_search_field(key=feature_key,
resource_type=ResourceType.Feature.name.lower(),
field='tag',
value=tag,
operation='add',
method=search_method)
http_status_code = HTTPStatus.OK
if metadata_status_code == HTTPStatus.OK and search_status_code == HTTPStatus.OK:
message = 'Success'
Expand Down
4 changes: 2 additions & 2 deletions frontend/amundsen_application/api/utils/request_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, da

if client is not None:
if method == 'DELETE':
return client.delete(url, headers=headers, raw_response=True)
return client.delete(url, headers=headers, raw_response=True, data=data, json=json)
elif method == 'GET':
return client.get(url, headers=headers, raw_response=True)
elif method == 'POST':
Expand All @@ -111,7 +111,7 @@ def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, da
else:
with build_session() as s:
if method == 'DELETE':
return s.delete(url, headers=headers, timeout=timeout_sec)
return s.delete(url, headers=headers, timeout=timeout_sec, data=data, json=json)
elif method == 'GET':
return s.get(url, headers=headers, timeout=timeout_sec)
elif method == 'POST':
Expand Down
40 changes: 40 additions & 0 deletions frontend/amundsen_application/api/utils/search_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import logging
import json

from typing import Dict, List # noqa: F401

from http import HTTPStatus

from flask import current_app as app

from amundsen_common.models.search import UpdateDocumentRequestSchema, UpdateDocumentRequest

from amundsen_application.api.utils.request_utils import request_search


LOGGER = logging.getLogger(__name__)

# These can move to a configuration when we have custom use cases outside of these default values
Expand Down Expand Up @@ -111,3 +124,30 @@ def has_filters(*, filters: Dict = {}, resource: str = '') -> bool:
if len(filter_list) > 0:
return True
return False


def update_search_field(key: str, resource_type: str, field: str, value: str, operation: str, method: str):
if method not in ['POST', 'DELETE']:
return HTTPStatus.BAD_REQUEST
searchservice_base = app.config['SEARCHSERVICE_BASE']
update_url = f'{searchservice_base}/v2/document'

update_request = UpdateDocumentRequest(resource_key=key,
resource_type=resource_type,
field=field,
value=value,
operation=operation)

request_json = json.dumps(UpdateDocumentRequestSchema().dump(update_request))
update_response = request_search(
url=update_url,
method=method,
headers={'Content-Type': 'application/json'},
data=request_json,
)
if update_response.status_code != HTTPStatus.OK:
LOGGER.info(f'Failed to update {field} in searchservice, status code: {update_response.status_code}')
LOGGER.info(update_response.text)
return update_response.status_code

return HTTPStatus.OK
2 changes: 1 addition & 1 deletion requirements-common.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

# It is recommended to always pin the exact version (not range) - otherwise common upgrade won't trigger unit tests
# on all repositories reyling on this file and any issues that arise from common upgrade might be missed.
amundsen-common>=0.23.0
amundsen-common>=0.24.0
attrs>=19.1.0
boto3==1.17.23
click==7.0
Expand Down