Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: improve CatalogProvider documentation with rationale and info about remote catalogs #8968

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result};
use std::any::Any;
use std::sync::Arc;

/// Represent a list of named catalogs
/// Represent a list of named [`CatalogProvider`]s.
///
/// Please see the documentation on `CatalogProvider` for details of
/// implementing a custom catalog.
pub trait CatalogList: Sync + Send {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps CatalogProviderList will be a better name? I can do this in following PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #9002

/// Returns the catalog list as [`Any`]
/// so that it can be downcast to a specific implementation.
Expand Down Expand Up @@ -94,6 +97,88 @@ impl CatalogList for MemoryCatalogList {
}

/// Represents a catalog, comprising a number of named schemas.
///
/// # Catalog Overview
///
/// To plan and execute queries, DataFusion needs a "Catalog" that provides
/// metadata such as which schemas and tables exist, their columns and data
/// types, and how to access the data.
///
/// The Catalog API consists:
/// * [`CatalogList`]: a collection of `CatalogProvider`s
/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems)
/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems)
/// * [`TableProvider]`: individual tables
///
/// # Implementing Catalogs
///
/// To implement a catalog, you implement at least one of the [`CatalogList`],
/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them
/// appropriately the [`SessionContext`].
///
/// [`SessionContext`]: crate::execution::context::SessionContext
///
/// DataFusion comes with a simple in-memory catalog implementation,
/// [`MemoryCatalogProvider`], that is used by default and has no persistence.
/// DataFusion does not include more complex Catalog implementations because
/// catalog management is a key design choice for most data systems, and thus
/// it is unlikely that any general-purpose catalog implementation will work
/// well across many use cases.
///
/// # Implementing "Remote" catalogs
///
/// Sometimes catalog information is stored remotely and requires a network call
/// to retrieve. For example, the [Delta Lake] table format stores table
/// metadata in files on S3 that must be first downloaded to discover what
/// schemas and tables exist.
///
/// [Delta Lake]: https://delta.io/
///
/// The [`CatalogProvider`] can support this use case, but it takes some care.
/// The planning APIs in DataFusion are not `async` and thus network IO can not
/// be performed "lazily" / "on demand" during query planning. The rationale for
/// this design is that using remote procedure calls for all catalog accesses
/// required for query planning would likely result in multiple network calls
/// per plan, resulting in very poor planning performance.
///
/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs,
/// you need to provide an in memory snapshot of the required metadata. Most
/// systems typically either already have this information cached locally or can
/// batch access to the remote catalog to retrieve multiple schemas and tables
/// in a single network call.
///
/// Note that [`SchemaProvider::table`] is an `async` function in order to
/// simplify implementing simple [`SchemaProvider`]s. For many table formats it
/// is easy to list all available tables but there is additional non trivial
/// access required to read table details (e.g. statistics).
///
/// The pattern that DataFusion itself uses to plan SQL queries is to walk over
/// the query to [find all schema / table references in an `async` function],
/// performing required remote catalog in parallel, and then plans the query
/// using that snapshot.
///
/// [find all schema / table references in an `async` function]: crate::execution::context::SessionState::resolve_table_references
///
/// # Example Catalog Implementations
///
/// Here are some examples of how to implement custom catalogs:
///
/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider
/// that treats files and directories on a filesystem as tables.
///
/// * The [`catalog.rs`]: a simple directory based catalog.
///
/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can
/// read from Delta Lake tables
///
/// [`datafusion-cli`]: https://arrow.apache.org/datafusion/user-guide/cli.html
/// [`DynamicFileCatalogProvider`]: https://github.com/apache/arrow-datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75
/// [`catalog.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/external_dependency/catalog.rs
/// [delta-rs]: https://github.com/delta-io/delta-rs
/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123
///
/// [`TableProvider]: crate::datasource::TableProvider

pub trait CatalogProvider: Sync + Send {
/// Returns the catalog provider as [`Any`]
/// so that it can be downcast to a specific implementation.
Expand Down
28 changes: 18 additions & 10 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,28 @@ use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};

/// Represents a schema, comprising a number of named tables.
///
/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
///
/// [`CatalogProvider`]: super::CatalogProvider
#[async_trait]
pub trait SchemaProvider: Sync + Send {
/// Returns the schema provider as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
/// Returns this `SchemaProvider` as [`Any`] so that it can be downcast to a
/// specific implementation.
fn as_any(&self) -> &dyn Any;

/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String>;

/// Retrieves a specific table from the schema by name, provided it exists.
/// Retrieves a specific table from the schema by name, if it exists,
/// otherwise returns `None`.
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;

/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
/// If supported by the implementation, adds a new table named `name` to
/// this schema.
///
/// If a table of the same name was already registered, returns "Table
/// already exists" error.
#[allow(unused_variables)]
fn register_table(
&self,
Expand All @@ -51,16 +59,16 @@ pub trait SchemaProvider: Sync + Send {
exec_err!("schema provider does not support registering tables")
}

/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
/// If supported by the implementation, removes the `name` table from this
/// schema and returns the previously registered [`TableProvider`], if any.
///
/// If no `name` table exists, returns Ok(None).
#[allow(unused_variables)]
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
exec_err!("schema provider does not support deregistering tables")
}

/// If supported by the implementation, checks the table exist in the schema provider or not.
/// If no matched table in the schema provider, return false.
/// Otherwise, return true.
/// Returns true if table exist in the schema provider, false otherwise.
fn table_exist(&self, name: &str) -> bool;
}

Expand Down