Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement relation filtering on get_catalog macro #964

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
bbac669
changelog
mikealfare Dec 16, 2023
28dd554
add _get_one_catalog_by_relations method, redirect _get_one_catalog t…
mikealfare Dec 16, 2023
b9cfd81
override get_catalog_by_relations to align with how get_catalog is ov…
mikealfare Dec 16, 2023
eab3067
turn off get_catalog_by_relations to test
mikealfare Dec 16, 2023
73979ee
call get_catalog by relation
mikealfare Dec 16, 2023
17c245f
guard against multiple info schemas in get_catalog_by_relations
mikealfare Dec 16, 2023
9b0bd5f
reuse get_catalog logic, add ability to pass relations into new method
mikealfare Dec 16, 2023
5236d40
redirect get_catalog_relations to get_catalog to check plumbing
mikealfare Dec 16, 2023
bf6e910
manually create schema_map so that it's limited to the relations prov…
mikealfare Dec 16, 2023
18d30f7
add check to guarantee relations is populated
mikealfare Dec 16, 2023
844cfbf
catch exception when info_schemas is empty
mikealfare Dec 16, 2023
2d4d3c2
catch exception when info_schemas is empty
mikealfare Dec 16, 2023
c3e608d
update the connection name
mikealfare Dec 16, 2023
5221389
mimic get_catalog behavior
mikealfare Dec 19, 2023
ccfeebd
mimic get_catalog behavior
mikealfare Dec 19, 2023
d609da9
redirect to new method
mikealfare Dec 19, 2023
3afd659
error on empty list of relations
mikealfare Dec 19, 2023
f28c181
use the cached version of the relation to ensure we have column metadata
mikealfare Dec 19, 2023
52b9dc5
move cache logic into the cache
mikealfare Dec 20, 2023
642bd12
fix typo
mikealfare Dec 20, 2023
534cd0f
remove whitespace fixes to reduce PR confusion
mikealfare Dec 20, 2023
0efc5a4
remove whitespace fixes to reduce PR confusion
mikealfare Dec 20, 2023
0e5c614
remove whitespace fixes to reduce PR confusion
mikealfare Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231215-191154.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support limiting get_catalog by object name
time: 2023-12-15T19:11:54.536441-05:00
custom:
Author: mikealfare
Issue: "900"
29 changes: 29 additions & 0 deletions dbt/adapters/spark/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from dbt.adapters.base import BaseRelation
from dbt.adapters.cache import RelationsCache
from dbt.exceptions import MissingRelationError
from dbt.utils import lowercase


class SparkRelationsCache(RelationsCache):
def get_relation_from_stub(self, relation_stub: BaseRelation) -> BaseRelation:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no method on RelationsCache that returns a specific BaseRelation. The next best option is to constantly return the set of relations in a schema (RelationsCache.get_relations) and then look for the target relation in that set. This method duplicates the logic in get_relations and adds the third check on identifier.

"""
Case-insensitively yield all relations matching the given schema.

:param BaseRelation relation_stub: The relation to look for
:return BaseRelation: The cached version of the relation
"""
with self.lock:
results = [
relation.inner
for relation in self.relations.values()
if all(
{
lowercase(relation.database) == lowercase(relation_stub.database),
lowercase(relation.schema) == lowercase(relation_stub.schema),
lowercase(relation.identifier) == lowercase(relation_stub.identifier),
}
)
]
if len(results) == 0:
raise MissingRelationError(relation_stub)
return results[0]
46 changes: 44 additions & 2 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from typing import Any, Dict, Iterable, List, Optional, Union, Type, Tuple, Callable, Set

from dbt.adapters.base.relation import InformationSchema
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.contracts.graph.manifest import Manifest

from typing_extensions import TypeAlias

import agate

import dbt
Expand All @@ -19,6 +19,7 @@
from dbt.adapters.spark import SparkConnectionManager
from dbt.adapters.spark import SparkRelation
from dbt.adapters.spark import SparkColumn
from dbt.adapters.spark.cache import SparkRelationsCache
from dbt.adapters.spark.python_submissions import (
JobClusterPythonJobHelper,
AllPurposeClusterPythonJobHelper,
Expand Down Expand Up @@ -101,12 +102,20 @@ class SparkAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.NOT_ENFORCED,
}

_capabilities = CapabilityDict(
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
)

Relation: TypeAlias = SparkRelation
RelationInfo = Tuple[str, str, str]
Column: TypeAlias = SparkColumn
ConnectionManager: TypeAlias = SparkConnectionManager
AdapterSpecificConfigs: TypeAlias = SparkConfig

def __init__(self, config) -> None: # type: ignore
super().__init__(config)
self.cache: SparkRelationsCache = SparkRelationsCache()

@classmethod
def date_function(cls) -> str:
return "current_timestamp()"
Expand Down Expand Up @@ -377,6 +386,25 @@ def get_catalog(
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def get_catalog_by_relations(
self, manifest: Manifest, relations: Set[BaseRelation]
) -> Tuple[agate.Table, List[Exception]]:
with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for relation in relations:
futures.append(
tpe.submit_connected(
self,
str(relation),
self._get_one_catalog_by_relations,
relation.information_schema_only(),
[relation],
manifest,
)
)
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def _get_one_catalog(
self,
information_schema: InformationSchema,
Expand All @@ -390,13 +418,27 @@ def _get_one_catalog(

database = information_schema.database
schema = list(schemas)[0]
relations = self.list_relations(database, schema)
return self._get_relation_metadata_at_column_level(relations)

def _get_relation_metadata_at_column_level(self, relations: List[BaseRelation]) -> agate.Table:
Copy link
Contributor Author

@mikealfare mikealfare Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the second half of _get_one_catalog above. It is broken out as it's own method now so it can be reused by the new method _get_one_catalog_by_relations below.

columns: List[Dict[str, Any]] = []
for relation in self.list_relations(database, schema):
for relation in relations:
logger.debug("Getting table schema for relation {}", str(relation))
columns.extend(self._get_columns_for_catalog(relation))
return agate.Table.from_object(columns, column_types=DEFAULT_TYPE_TESTER)

def _get_one_catalog_by_relations(
self,
information_schema: InformationSchema,
relations: List[BaseRelation],
manifest: Manifest,
) -> agate.Table:
cached_relations = [
self.cache.get_relation_from_stub(relation_stub) for relation_stub in relations
]
return self._get_relation_metadata_at_column_level(cached_relations)

def check_schema_exists(self, database: str, schema: str) -> bool:
results = self.execute_macro(LIST_SCHEMAS_MACRO_NAME, kwargs={"database": database})

Expand Down