From 5bbcf7f1ed6cc4240b4c31fb7d3c8ebf7eda2f6d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 23 Jan 2024 10:54:44 -0500 Subject: [PATCH] Minor: improve CatalogProvider documentation with rationale --- datafusion/core/src/catalog/mod.rs | 87 ++++++++++++++++++++++++++- datafusion/core/src/catalog/schema.rs | 28 ++++++--- 2 files changed, 104 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/catalog/mod.rs b/datafusion/core/src/catalog/mod.rs index ce27d57da00d..da7e1f5e2193 100644 --- a/datafusion/core/src/catalog/mod.rs +++ b/datafusion/core/src/catalog/mod.rs @@ -29,7 +29,10 @@ use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result}; use std::any::Any; use std::sync::Arc; -/// Represent a list of named catalogs +/// Represent a list of named [`CatalogProvider`]s. +/// +/// Please see the documentation on `CatalogProvider` for details of +/// implementing a custom catalog. pub trait CatalogList: Sync + Send { /// Returns the catalog list as [`Any`] /// so that it can be downcast to a specific implementation. @@ -94,6 +97,88 @@ impl CatalogList for MemoryCatalogList { } /// Represents a catalog, comprising a number of named schemas. +/// +/// # Catalog Overview +/// +/// To plan and execute queries, DataFusion needs a "Catalog" that provides +/// metadata such as which schemas and tables exist, their columns and data +/// types, and how to access the data. +/// +/// The Catalog API consists: +/// * [`CatalogList`]: a collection of `CatalogProvider`s +/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems) +/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems) +/// * [`TableProvider]`: individual tables +/// +/// # Implementing Catalogs +/// +/// To implement a catalog, you implement at least one of the [`CatalogList`], +/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them +/// appropriately the [`SessionContext`]. +/// +/// [`SessionContext`]: crate::execution::context::SessionContext +/// +/// DataFusion comes with a simple in-memory catalog implementation, +/// [`MemoryCatalogProvider`], that is used by default and has no persistence. +/// DataFusion does not include more complex Catalog implementations because +/// catalog management is a key design choice for most data systems, and thus +/// it is unlikely that any general-purpose catalog implementation will work +/// well across many use cases. +/// +/// # Implementing "Remote" catalogs +/// +/// Sometimes catalog information is stored remotely and requires a network call +/// to retrieve. For example, the [Delta Lake] table format stores table +/// metadata in files on S3 that must be first downloaded to discover what +/// schemas and tables exist. +/// +/// [Delta Lake]: https://delta.io/ +/// +/// The [`CatalogProvider`] can support this use case, but it takes some care. +/// The planning APIs in DataFusion are not `async` and thus network IO can not +/// be performed "lazily" / "on demand" during query planning. The rationale for +/// this design is that using remote procedure calls for all catalog accesses +/// required for query planning would likely result in multiple network calls +/// per plan, resulting in very poor planning performance. +/// +/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs, +/// you need to provide an in memory snapshot of the required metadata. Most +/// systems typically either already have this information cached locally or can +/// batch access to the remote catalog to retrieve multiple schemas and tables +/// in a single network call. +/// +/// Note that [`SchemaProvider::table`] is an `async` function in order to +/// simplify implementing simple [`SchemaProvider`]s. For many table formats it +/// is easy to list all available tables but there is additional non trivial +/// access required to read table details (e.g. statistics). +/// +/// The pattern that DataFusion itself uses to plan SQL queries is to walk over +/// the query to [find all schema / table references in an `async` function], +/// performing required remote catalog in parallel, and then plans the query +/// using that snapshot. +/// +/// [find all schema / table references in an `async` function]: crate::execution::context::SessionState::resolve_table_references +/// +/// # Example Catalog Implementations +/// +/// Here are some examples of how to implement custom catalogs: +/// +/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider +/// that treats files and directories on a filesystem as tables. +/// +/// * The [`catalog.rs`]: a simple directory based catalog. +/// +/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can +/// read from Delta Lake tables +/// +/// [`datafusion-cli`]: https://arrow.apache.org/datafusion/user-guide/cli.html +/// [`DynamicFileCatalogProvider`]: https://github.com/apache/arrow-datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75 +/// [`catalog.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/external_dependency/catalog.rs +/// [delta-rs]: https://github.com/delta-io/delta-rs +/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123 +/// +/// [`TableProvider]: crate::datasource::TableProvider + pub trait CatalogProvider: Sync + Send { /// Returns the catalog provider as [`Any`] /// so that it can be downcast to a specific implementation. diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index 1bb2df914ab2..2cebad717249 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -28,20 +28,28 @@ use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; /// Represents a schema, comprising a number of named tables. +/// +/// Please see [`CatalogProvider`] for details of implementing a custom catalog. +/// +/// [`CatalogProvider`]: super::CatalogProvider #[async_trait] pub trait SchemaProvider: Sync + Send { - /// Returns the schema provider as [`Any`](std::any::Any) - /// so that it can be downcast to a specific implementation. + /// Returns this `SchemaProvider` as [`Any`] so that it can be downcast to a + /// specific implementation. fn as_any(&self) -> &dyn Any; /// Retrieves the list of available table names in this schema. fn table_names(&self) -> Vec; - /// Retrieves a specific table from the schema by name, provided it exists. + /// Retrieves a specific table from the schema by name, if it exists, + /// otherwise returns `None`. async fn table(&self, name: &str) -> Option>; - /// If supported by the implementation, adds a new table to this schema. - /// If a table of the same name existed before, it returns "Table already exists" error. + /// If supported by the implementation, adds a new table named `name` to + /// this schema. + /// + /// If a table of the same name was already registered, returns "Table + /// already exists" error. #[allow(unused_variables)] fn register_table( &self, @@ -51,16 +59,16 @@ pub trait SchemaProvider: Sync + Send { exec_err!("schema provider does not support registering tables") } - /// If supported by the implementation, removes an existing table from this schema and returns it. - /// If no table of that name exists, returns Ok(None). + /// If supported by the implementation, removes the `name` table from this + /// schema and returns the previously registered [`TableProvider`], if any. + /// + /// If no `name` table exists, returns Ok(None). #[allow(unused_variables)] fn deregister_table(&self, name: &str) -> Result>> { exec_err!("schema provider does not support deregistering tables") } - /// If supported by the implementation, checks the table exist in the schema provider or not. - /// If no matched table in the schema provider, return false. - /// Otherwise, return true. + /// Returns true if table exist in the schema provider, false otherwise. fn table_exist(&self, name: &str) -> bool; }