diff --git a/mkdocs.yml b/mkdocs.yml index 366e3e77d2..9c1ef3595b 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -92,8 +92,6 @@ nav: - 'Recommended Practices': 'frontend/docs/recommended_practices.md' - 'Search': - 'Overview': 'search/README.md' - - 'Proxy': - - 'Atlas Backend': 'search/docs/atlas-search.md' - 'Metadata': - 'Overview': 'metadata/README.md' - 'Configuration': diff --git a/search/docs/atlas-search.md b/search/docs/atlas-search.md deleted file mode 100644 index 6aec2456f0..0000000000 --- a/search/docs/atlas-search.md +++ /dev/null @@ -1,58 +0,0 @@ -# Atlas search investigation - -There are several approaches to integrate searching within [Apache Atlas](https://atlas.apache.org/ "Apache Atlas"), we describe multiple options below: - -## Use Data Builder to fill Elasticsearch from Atlas - -Atlas search data extractor can be used to synchronize Atlas with Elasticsearch. This method requires you to: - -- deploy Elasticsearch -- register a process that synchronizes the data between Atlas and Elasticsearch - -We suggest using Elasticsearch as backend for Atlas janusgraph (it's possible with latest Atlas version) and additionally sync data with databuilder -to have indices compatible with Amundsen Elasticsearch Search Proxy. - -Mixing Atlas Metadata Proxy with Elasticsearch Search Proxy is 100% safe and fully compatible. - -Raw janusgraph indices are not compatible with Amundsen Elasticsearch Search Proxy and it would require implementing custom class over Elasticsearch Search Proxy. - -**This is preferred way of handling Amundsen search.** - -### Advantages - -- The performance is 10-20x better (verified on production environment) -- Possibility to search on many fields at the same time (and defining importance of each field) -- Much better and flexible relevancy scoring - -### Disadvantages - -- Requires additional component (Elasticsearch) if Apache Solr is used for Atlas search backend -- Requires scheduling (cron, airflow, kubernetes cron job) of databuilder app to synchronize the data periodically -- The data in Elasticsearch is as fresh as frequent syncing app - there might be misalignment between Atlas Metadata and Elasticsearch index - -## Use Atlas REST API - -Directly using the Atlas API's is quick to implement and easy to setup for administrators. Atlas uses a search engine -behind the scenes (Solr and Elasticsearch are fully supported) to perform search queries. - -### Advantages - -- Quicker way to achieve Amundsen <> Atlas integration -- Data in search is available as soon as it's indexed in Atlas -- Simpler setup (less components/applications) - -### Disadvantages - -- Atlas Search API is very limited in terms of multi-field search and relevancy tuning -- Atlas Search API has suboptimal performance and doesn't really leverage underlying full text engine (it's heavily abstracted by janusgraph) -- Amundsen AtlasProxy for search might be lagging in features as it's not as popular as Elasticsearch Proxy - -## Discussion - -Both the REST API approach and the data builder approach can be configurable. Both approaches have -their own benefits, the data builder together provides a more fine-tuned search whereas the Atlas REST API comes out -of the box with Atlas. - -The focus was initially to implement the REST API approach but after several months on production we decided to introduce -Atlas search data extractor and use Elasticsearch Proxy for Amundsen search. It proved to be much more robust and flexible solution. -The disadvantages were quickly eclipsed by advantages. diff --git a/search/requirements.txt b/search/requirements.txt index 9078ef2158..db5445f8b3 100644 --- a/search/requirements.txt +++ b/search/requirements.txt @@ -1,6 +1,5 @@ # Copyright Contributors to the Amundsen project. # SPDX-License-Identifier: Apache-2.0 -pyatlasclient==1.0.3 elasticsearch==7.13.3 elasticsearch-dsl==7.4.0 diff --git a/search/search_service/api/swagger_doc/template.yml b/search/search_service/api/swagger_doc/template.yml index 20af1d5f61..c39a309522 100644 --- a/search/search_service/api/swagger_doc/template.yml +++ b/search/search_service/api/swagger_doc/template.yml @@ -1,7 +1,7 @@ openapi: '3.0.2' info: title: 'Search Service' - description: 'Used to communicate with elasticsearch or apache atlas to get search results. Used by the frontend service' + description: 'Used to communicate with elasticsearch to get search results. Used by the frontend service' version: '1.1.12' components: diff --git a/search/search_service/config.py b/search/search_service/config.py index 78e880307f..0979f5713c 100644 --- a/search/search_service/config.py +++ b/search/search_service/config.py @@ -14,8 +14,7 @@ PROXY_CLIENT = 'PROXY_CLIENT' PROXY_CLIENT_KEY = 'PROXY_CLIENT_KEY' PROXY_CLIENTS = { - 'ELASTICSEARCH': 'search_service.proxy.elasticsearch.ElasticsearchProxy', - 'ATLAS': 'search_service.proxy.atlas.AtlasProxy' + 'ELASTICSEARCH': 'search_service.proxy.elasticsearch.ElasticsearchProxy' } diff --git a/search/search_service/proxy/atlas.py b/search/search_service/proxy/atlas.py deleted file mode 100644 index 5e3a090dbf..0000000000 --- a/search/search_service/proxy/atlas.py +++ /dev/null @@ -1,297 +0,0 @@ -# Copyright Contributors to the Amundsen project. -# SPDX-License-Identifier: Apache-2.0 - -import logging -from re import sub -from typing import ( - Any, Dict, List, Optional, Tuple, -) - -from atlasclient.client import Atlas -from atlasclient.exceptions import BadRequest -from atlasclient.models import Entity, EntityCollection -# default search page size -from atlasclient.utils import parse_table_qualified_name - -from search_service.models.dashboard import SearchDashboardResult -from search_service.models.feature import SearchFeatureResult -from search_service.models.table import SearchTableResult, Table -from search_service.models.tag import Tag -from search_service.models.user import SearchUserResult -from search_service.proxy import BaseProxy -from search_service.proxy.statsd_utilities import timer_with_counter - -LOGGER = logging.getLogger(__name__) - - -class AtlasProxy(BaseProxy): - """ - AtlasSearch connection handler - """ - ATLAS_TABLE_ENTITY = 'Table' - ATLAS_QN_ATTRIBUTE = 'qualifiedName' - - def __init__(self, *, - host: str = None, - user: str = '', - password: str = '', - client: Atlas = None, - page_size: int = 10) -> None: - self.atlas = client or Atlas(host, username=user, password=password) - self.page_size = page_size - - @staticmethod - def _extract_entities(collections: EntityCollection) -> List[Entity]: - """ - Helper method for flattening all collections from {collections} - - :return: list of all entities - """ - entities: List[Entity] = [] - - for collection in collections: - entities.extend(collection.entities) - return entities - - def _prepare_tables(self, response: EntityCollection, enhance_metadata: bool = False) -> List[Table]: - """ - Based on an Atlas {response} with table entities, we render Table objects. - - :param response: Collection of Atlas Entities - :param enhance_metadata: Should Atlas be queried to acquire complete entity definitions (search might not - return all available attributes) - :return: List of Table objects - """ - - result = list() - - # if condition is satisfied then we query Atlas again to collect all available information regarding each table - # along with relationship information. This is helpful when using Atlas DSL as returned entities contain minimal - # amount of attributes. - if enhance_metadata: - ids = list() - - for hit in response: - ids.append(hit.guid) - - entities = self._extract_entities(self.atlas.entity_bulk(guid=ids, ignoreRelationships=False)) - else: - entities = response - - for entity in entities: - entity_attrs = entity.attributes - - qn = parse_table_qualified_name(qualified_name=entity_attrs.get(self.ATLAS_QN_ATTRIBUTE)) - - entity_name = qn.get('table_name') or entity_attrs.get('name') - db_name = qn.get('db_name', '') - db_cluster = qn.get('cluster_name', '') - - tags: List[Tag] = [] - - for classification in entity.classificationNames or list(): - tags.append(Tag(tag_name=classification)) - - badges: List[Tag] = tags - - table = Table(id=f"{entity.typeName}://{db_cluster}.{db_name}/{entity_name}", - name=entity_name, - key=f"{entity.typeName}://{db_cluster}.{db_name}/{entity_name}", - description=entity_attrs.get('description'), - cluster=db_cluster, - database=entity.typeName, - schema=db_name, - tags=tags, - badges=badges, - column_names=[], - last_updated_timestamp=entity_attrs.get('updateTime')) - - result.append(table) - - return result - - def _atlas_basic_search(self, query_params: Dict) -> Tuple[List[Table], int]: - """ - Conduct search using Atlas Basic Search API. - - :param query_params: A dictionary of query parameters needed to be pass to Basic Search Post method of Atlas - :return: List of Table objects and approximate count of entities matching in Atlas - """ - - try: - # Fetch the table entities based on query terms - search_results = self.atlas.search_basic.create(data=query_params) - except BadRequest as ex: - LOGGER.error(f"Fetching Tables Failed : {str(ex)}") - return [], 0 - - if not len(search_results.entities): - return [], 0 - - # noinspection PyProtectedMember - results_count = search_results._data.get("approximateCount") - - results = self._prepare_tables(search_results.entities, enhance_metadata=False) - - return results, results_count - - def _prepare_basic_search_query(self, limit: int, page_index: int, query_term: Optional[str] = None, - filters: Optional[List[Tuple[str, str, str]]] = None, - operator: Optional[str] = None, - classification: Optional[str] = None, - entity_type: str = None) -> Dict[str, Any]: - """ - Render a query for Atlas Basic Search API. - - :param query_term: Search Query Term - :param limit: - :param page_index: - :param fitlers: Optional list of tuples (field, condition, value) that will translate into entityFilters for - narrowing down search results - :param operator: - :param entity_type: Which kind of entity this query will look for - :return: Dictionary object prepared for Atlas client basic_search - """ - if not entity_type: - entity_type = self.ATLAS_TABLE_ENTITY - - query: Dict[str, Any] = {'typeName': entity_type, - 'excludeDeletedEntities': True, - 'includeSubTypes': True, - 'limit': limit, - 'offset': page_index * self.page_size, - 'sortBy': 'popularityScore', - 'sortOrder': 'DESCENDING'} - - if query_term: - query_term = f'*{query_term}*' - query_term = sub('\\*+', '*', query_term) - - query['query'] = query_term - - # filters and query_term shouldn't be mixed - if filters and not query_term: - condition = operator or 'AND' - criterion: List[Dict[str, str]] = list() - - for _query_filter in filters: - attribute_name, operator, attribute_value = _query_filter - - # filters perform much better when wildcard is dot, not star - attribute_value = sub('\\*', '.', attribute_value) - query_filter = {'attributeName': attribute_name, - 'operator': operator.upper(), - 'attributeValue': attribute_value} - - criterion.append(query_filter) - - if len(criterion) > 1: - query['entityFilters'] = {'condition': condition, 'criterion': criterion} - elif len(criterion) == 1: - query['entityFilters'] = criterion[0] - elif classification: - query['classification'] = classification - - return query - - @timer_with_counter - def fetch_table_search_results(self, *, - query_term: str, - page_index: int = 0, - index: str = '') -> SearchTableResult: - """ - Conduct a 'Basic Search' in Amundsen UI. - - Atlas Basic Search API is used for that operation. We search on `qualifiedName` field as - (following Atlas documentation) any 'Referencable' entity 'can be searched for using a unique attribute called - qualifiedName'. It provides best performance, simplicity and sorting by popularityScore. - - :param query_term: Search Query Term - :param page_index: Index of search page user is currently on (for pagination) - :param index: Search Index (different resource corresponding to different index) - :return: SearchTableResult Object - """ - if not query_term: - # return empty result for blank query term - return SearchTableResult(total_results=0, results=[]) - - query_params = self._prepare_basic_search_query(self.page_size, page_index, query_term=query_term) - - tables, approx_count = self._atlas_basic_search(query_params) - - return SearchTableResult(total_results=approx_count, results=tables) - - def fetch_search_results_with_filter(self, *, - query_term: str, - search_request: dict, - page_index: int = 0, - index: str = '') -> SearchTableResult: - """ - Conduct an 'Advanced Search' to narrow down search results with a use of filters. - - Using Atlas Basic Search with filters to retrieve precise results and sort them by popularity score. - - - :param query_term: A Search Query Term - :param search_request: Values from Filters - :param page_index: Index of search page user is currently on (for pagination) - :param index: Search Index (different resource corresponding to different index) - :return: SearchTableResult Object - """ - _filters = search_request.get('filters', dict()) - - db_filter_value = _filters.get('database') - table_filter_value = _filters.get('table') - cluster_filter_value = _filters.get('cluster') - tags_filter_value = _filters.get('tag') - - filters = list() - - # qualifiedName follows pattern ${db}.${table}@${cluster} - if db_filter_value: - filters.append((self.ATLAS_QN_ATTRIBUTE, 'STARTSWITH', db_filter_value[0] + '.')) - - if cluster_filter_value: - filters.append((self.ATLAS_QN_ATTRIBUTE, 'ENDSWITH', '@' + cluster_filter_value[0])) - - if table_filter_value: - filters.append(('name', 'CONTAINS', table_filter_value[0])) - - # Currently Atlas doesn't allow mixing search by filters and classifications - if filters: - query_params = self._prepare_basic_search_query(self.page_size, page_index, - filters=filters) - elif tags_filter_value: - query_params = self._prepare_basic_search_query(self.page_size, page_index, - classification=tags_filter_value[0]) - - tables, approx_count = self._atlas_basic_search(query_params) - - return SearchTableResult(total_results=approx_count, results=tables) - - def fetch_user_search_results(self, *, - query_term: str, - page_index: int = 0, - index: str = '') -> SearchUserResult: - pass - - def fetch_dashboard_search_results(self, *, - query_term: str, - page_index: int = 0, - index: str = '') -> SearchDashboardResult: - pass - - def fetch_feature_search_results(self, *, - query_term: str, - page_index: int = 0, - index: str = '') -> SearchFeatureResult: - pass - - def update_document(self, *, data: List[Dict[str, Any]], index: str = '') -> str: - raise NotImplementedError() - - def create_document(self, *, data: List[Dict[str, Any]], index: str = '') -> str: - raise NotImplementedError() - - def delete_document(self, *, data: List[str], index: str = '') -> str: - raise NotImplementedError() diff --git a/search/tests/unit/proxy/test_atlas.py b/search/tests/unit/proxy/test_atlas.py deleted file mode 100644 index 3663157246..0000000000 --- a/search/tests/unit/proxy/test_atlas.py +++ /dev/null @@ -1,301 +0,0 @@ -# Copyright Contributors to the Amundsen project. -# SPDX-License-Identifier: Apache-2.0 - -import unittest -from typing import ( - Any, Callable, Dict, List, Tuple, -) - -from amundsen_common.models.api import health_check -from mock import MagicMock, patch - -from search_service import config, create_app -from search_service.models.table import SearchTableResult, Table -from search_service.models.tag import Tag -from search_service.proxy import get_proxy_client - - -class TestAtlasProxy(unittest.TestCase): - maxDiff = None - - def to_class(self, entity: Dict[str, Any]) -> Any: - class ObjectView(object): - def __init__(self, dictionary: Dict[str, Any]): - self.__dict__ = dictionary - - return ObjectView(entity) - - def setUp(self) -> None: - self.app = create_app(config_module_class='search_service.config.LocalConfig') - self.app_context = self.app.app_context() - self.app_context.push() - self.qn = False - with self.app_context: - from search_service.proxy.atlas import AtlasProxy - self.proxy = AtlasProxy(host='DOES_NOT_MATTER:0000') - self.proxy.atlas = MagicMock() - self.qn = 'name' == "qualifiedName" - self.entity_type = 'TEST_ENTITY' - self.cluster = 'TEST_CLUSTER' - self.db = 'TEST_DB' - self.name = 'TEST_TABLE' - self.table_uri = f'{self.entity_type}://{self.cluster}.{self.db}/{self.name}' - - self.classification_entity = { - 'classifications': [ - {'typeName': 'PII_DATA', 'name': 'PII_DATA'}, - ] - } - self.test_column = { - 'guid': 'DOESNT_MATTER', - 'typeName': 'COLUMN', - 'attributes': { - 'qualifiedName': f"{self.db}.Table1.column@{self.cluster}", - 'type': 'Managed', - 'description': 'column description', - 'position': 1 - } - } - - self.db_entity = { - 'guid': '-100', - 'typeName': self.entity_type, - 'attributes': { - 'qualifiedName': self.db + "@" + self.cluster, - 'name': self.db, - 'clusterName': self.cluster - } - } - - self.entity1_name = 'Table1' - self.entity1_description = 'Dummy Description' - self.entity1 = { - 'guid': '1', - 'typeName': self.entity_type, - 'classificationNames': [ - 'PII_DATA' - ], - 'relationshipAttributes': { - 'db': self.db_entity - }, - 'attributes': { - 'updateTime': 123, - 'name': self.entity1_name, - 'qualifiedName': f"{self.db}.Table1@{self.cluster}", - 'classifications': [ - {'typeName': 'PII_DATA'} - ], - 'description': self.entity1_description, - 'owner': 'dummy@email.com', - 'columns': [self.test_column], - 'db': self.db_entity - }, - 'classifications': self.classification_entity['classifications'] - } - - self.entity2_name = 'Table2' - self.entity2 = { - 'guid': '2', - 'typeName': self.entity_type, - 'classificationNames': [], - 'attributes': { - 'updateTime': 234, - 'qualifiedName': 'Table2', - 'name': self.entity2_name, - 'db': None, - 'description': 'Dummy Description', - 'owner': 'dummy@email.com', - }, - 'classifications': self.classification_entity['classifications'] - } - - self.entities = { - 'entities': [ - self.entity1, - self.entity2, - ], - } - - def _qualified(self, kind: str, name: str, table: str = None) -> str: - if not self.qn: - return name - if kind == "db": - return f"{name}@{self.cluster}" - if kind == "column" and table: - return f"{self.db}.{table}.{name}@{self.cluster}" - if kind == "table": - return f"{self.db}.{name}@{self.cluster}" - return name - - @staticmethod - def recursive_mock(start: Any) -> Any: - """ - The atlas client allows retrieval of data via __getattr__. - That is why we build this method to recursively mock dictionary's to add - the __getattr__ and to convert them into MagicMock. - :param start: dictionary, list, or other - :return: MagicMock, list with mocked items, or other - """ - if isinstance(start, dict): - dict_mock = MagicMock() - dict_mock.__getitem__.side_effect = start.__getitem__ - dict_mock.__iter__.side_effect = start.__iter__ - dict_mock.__contains__.side_effect = start.__contains__ - dict_mock.get.side_effect = start.get - for key, value in start.items(): - value_mock = TestAtlasProxy.recursive_mock(value) - dict_mock.__setattr__(key, value_mock) - start[key] = value_mock - return dict_mock - elif isinstance(start, (list,)): - return list(map(TestAtlasProxy.recursive_mock, start)) - else: - return start - - @staticmethod - def dsl_inject(checks: List[Tuple[Callable[[str], bool], dict]]) -> Callable: - """ - helper method for returning results based on sql queries - :param checks: - :return: - """ - - def search_dsl(query: str) -> Dict[str, Any]: - for check, data in checks: - if check(query): - response = MagicMock() - d = TestAtlasProxy.recursive_mock(data) - d.__iter__.return_value = [d] - d._data = { - 'queryType': "DSL", - 'queryText': query, - **data - } - response.__iter__.return_value = [d] - - return response - raise Exception(f"query not supported: {query}") - - return search_dsl - - @staticmethod - def bulk_inject(entities: List[Dict[str, Any]]) -> Callable: - """ - provide an entity_bulk method for atlas - :param entities: - :return: - """ - - # noinspection PyPep8Naming - def guid_filter(guid: List, ignoreRelationships: bool = False) -> Any: - return TestAtlasProxy.recursive_mock([{ - 'entities': list(filter(lambda x: x['guid'] in guid, entities)) - }]) - - return guid_filter - - def test_setup_client(self) -> None: - with self.app_context: - from search_service.proxy.atlas import AtlasProxy - client = AtlasProxy( - host="http://localhost:21000", - user="admin", - password="admin", - page_size=1337 - ) - self.assertEqual(client.atlas.base_url, "http://localhost:21000") - self.assertEqual(client.atlas.client.request_params['headers']['Authorization'], 'Basic YWRtaW46YWRtaW4=') - self.assertEqual(client.page_size, 1337) - - @patch('search_service.proxy._proxy_client', None) - def test_setup_config(self) -> None: - # Gather all the configuration to create a Proxy Client - self.app.config[config.PROXY_ENDPOINT] = "http://localhost:21000" - self.app.config[config.PROXY_USER] = "admin" - self.app.config[config.PROXY_PASSWORD] = "admin" - self.app.config[config.PROXY_CLIENT] = config.PROXY_CLIENTS['ATLAS'] - self.app.config[config.SEARCH_PAGE_SIZE_KEY] = 1337 - - client = get_proxy_client() - self.assertEqual(client.atlas.base_url, "http://localhost:21000") # type: ignore - self.assertEqual(client.atlas.client.request_params['headers']['Authorization'], # type: ignore - 'Basic YWRtaW46YWRtaW4=') - self.assertEqual(client.page_size, 1337) # type: ignore - - def test_health_atlast(self) -> None: - health_actual = self.proxy.health() - expected_checks = {'AtlasProxy:connection': {'status': 'not checked'}} - health_expected = health_check.HealthCheck(status='ok', checks=expected_checks) - self.assertEqual(health_actual.status, health_expected.status) - self.assertDictEqual(health_actual.checks, health_expected.checks) - - def test_search_normal(self) -> None: - expected = SearchTableResult(total_results=2, - results=[Table(id=f"{self.entity_type}://" - f"{self.cluster}.{self.db}/" - f"{self.entity1_name}", - name=self.entity1_name, - key=f"{self.entity_type}://" - f"{self.cluster}.{self.db}/" - f"{self.entity1_name}", - description=self.entity1_description, - cluster=self.cluster, - database=self.entity_type, - schema=self.db, - column_names=[], - tags=[Tag(tag_name='PII_DATA')], - badges=[Tag(tag_name='PII_DATA')], - last_updated_timestamp=123)]) - entity1 = self.to_class(self.entity1) - entity_collection = MagicMock() - entity_collection.entities = [entity1] - entity_collection._data = {'approximateCount': 1} - - result = MagicMock(return_value=entity_collection) - - with patch.object(self.proxy.atlas.search_basic, 'create', result): - resp = self.proxy.fetch_table_search_results(query_term="Table") - self.assertEqual(resp.total_results, 1) - self.assertIsInstance(resp.results[0], Table, "Search result received is not of 'Table' type!") - self.assertDictEqual(vars(resp.results[0]), vars(expected.results[0]), - "Search Result doesn't match with expected result!") - - def test_search_empty(self) -> None: - expected = SearchTableResult(total_results=0, - results=[]) - self.proxy.atlas.search_dsl = self.dsl_inject([ - (lambda dsl: "select count()" in dsl, - {"attributes": {"name": ["count()"], "values": [[0]]}}), - (lambda dsl: any(x in dsl for x in ["select table", "from Table"]), - {'entities': []}) - ]) - self.proxy.atlas.entity_bulk = self.bulk_inject([ - self.entity1, - self.entity2, - self.db_entity - ]) - resp = self.proxy.fetch_table_search_results(query_term="Table1") - self.assertTrue(resp.total_results == 0, "there should no search results") - self.assertIsInstance(resp, SearchTableResult, "Search result received is not of 'SearchResult' type!") - self.assertDictEqual(vars(resp), vars(expected), - "Search Result doesn't match with expected result!") - - def test_unknown_field(self) -> None: - expected = SearchTableResult(total_results=0, - results=[]) - self.proxy.atlas.search_dsl = self.dsl_inject([ - (lambda dsl: "select count()" in dsl, - {"attributes": {"name": ["count()"], "values": [[0]]}}), - (lambda dsl: any(x in dsl for x in ["select table", "from Table"]), - {'entities': []}) - ]) - self.proxy.atlas.entity_bulk = self.bulk_inject([ - self.entity1, - self.entity2, - self.db_entity - ]) - resp = self.proxy.fetch_table_search_results(query_term="unknown:Table1") - self.assertTrue(resp.total_results == 0, "there should no search results") - self.assertIsInstance(resp, SearchTableResult, "Search result received is not of 'SearchResult' type!") - self.assertDictEqual(vars(resp), vars(expected), - "Search Result doesn't match with expected result!")