Skip to content

Commit

Permalink
- adding SQL doc.
Browse files Browse the repository at this point in the history
Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
  • Loading branch information
kristoffSC authored and scottsand-db committed Jun 26, 2023
1 parent 2ce6643 commit f382819
Showing 1 changed file with 305 additions and 13 deletions.
318 changes: 305 additions & 13 deletions connectors/flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Official Delta Lake connector for [Apache Flink](https://flink.apache.org/).
- [Bounded Mode](#bounded-mode)
- [Continuous Mode](#continuous-mode)
- [Examples](#delta-source-examples)
- [SQL Support](#sql-support)
- [Usage](#usage)
- [Maven](#maven)
- [SBT](#sbt)
Expand Down Expand Up @@ -311,16 +312,296 @@ public DataStream<RowData> createContinuousDeltaSourceUserColumns(
}
```

## SQL Support
Starting from version 0.7.0 the Delta connector can be used for Flink SQL jobs.
Both Delta Source and Delta Sink can be used as Flink Tables for SELECT and INSERT queries.

Flink/Delta SQL connector **must** be used with Delta Catalog. Trying to execute SQL queries on Delta table
using Flink API without Delta Catalog configured will cause SQL job to fail.

| Feature support | Notes |
|------------------------------------------------|-----------------------------------------------------------------------------------------|
| [CREATE CATALOG](#delta-catalog-configuration) | A Delta Catalog is required for Delta Flink SQL support. |
| [CREATE DATABASE](#create-database) | |
| [CREATE TABLE](#create-table) | |
| [CREATE TABLE LIKE](#create-table-like) | |
| [ALTER TABLE](#alter-table) | Support only altering table properties; column and partition changes are not supported. |
| [DROP TABLE](#drop-table) | Remove data from metastore leaving Delta table files on filesystem untouched. |
| [SQL SELECT](#select-query) | Supports both batch (default) and streaming modes. |
| [SQL INSERT](#insert-query) | Support both streaming and batch mode. |

### Delta Catalog
The Delta Catalog is meant to be a source of truth regarding Delta tables in Flink's SQL API.
That is why it is required by user to use Delta Catalog for every interaction with Delta table using Flink SQL query.
Such SQL query will fail if used without Delta Catalog properly configured for given SQL session.

At the same time, any other Flink connector (Kafka, Filesystem etc.) can be used with Delta Catalog unless it has any restrictions on its own.
This is achieved by Delta Catalog acting as a proxy for non Delta tables.
For Delta tables however, the Delta Catalog ensures that any DDL operation is reflected in underlying Delta table log.
In other words, Delta Catalog ensures that only valid Delta tables can be created and used by Flink job.

#### Decorated catalog
Delta Catalog is implemented using a decorator pattern. It decorates/wraps other [Catalog](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/)
implementation.

For Delta tables, only minimum information such as database/table name, connector type and delta table file path will be stored in the metastore.
For Delta tables no information about table properties or schema will be stored in the metastore.
Delta Catalog will store those in `_delta_log`.

For non-Delta tables, Delta Catalog acts as a simple proxy and fully redirects every method call to decorated catalog.

#### Delta Catalog Configuration
A catalog is created and named by executing the following query:
```sql
CREATE CATALOG <catalog_name> WITH (
'type' = 'delta-catalog',
'catalog-type' = '<decorated-catalog>',
'<config_key_1>' = '<config_value_1>',
'<config_key_2>' = '<config_value_2>'
);
USE CATALOG <catalog_name>;
```
Replace `<catalog_name>` with your catalog's name.
Replace `<decorated-catalog>` with the Catalog implementation type that you want to use as the decorated catalog.
Currently, only `in-memory` (default) and `hive` decorated catalogs are supported.

The following properties can be set:
+ `type` - must be `delta-catalog`. This option is required by Flink.
+ `catalog-type` - an optional option that allows to specify type of decorated catalog. Allowed options are:
+ `in-memory`- a default value if no other specified. Will use Flink's In-Memory catalog as decorated catalog.
+ `hive` - Use Flink's Hive catalog as decorated catalog.

Any extra defined property will be passed to the decorated catalog. For example, in order to create
Delta Catalog backed by Hive catalog and use Hive's catalog `hadoop-conf-dir` option call the below query:
```sql
CREATE CATALOG <catalog_name> WITH (
'type' = 'delta-catalog',
'catalog-type' = 'hive',
'hadoop-conf-dir' = '<some-path>'
);
USE CATALOG <catalog_name>;
```

#### Delta Catalog Table Cache
As a performance optimization, the Delta Catalog automatically caches Delta tables,
since these tables can be expensive to recompute.

This cache has default size of 100 (tables) and uses an LRU policy to evict old cached entries.
You can change this value by adding deltaCatalogTableCacheSize to your Flink cluster's
hadoop configuration. Please note that this configuration will have a global effect for every
Delta Catalog instance running on your cluster. See [Hadoop Configuration](#hadoop-configuration) section for details.

### DDL commands
#### CREATE DATABASE
By default, Delta Catalog will use the `default` database.
Use the following example to create a separate database:

```sql
CREATE DATABASE custom_DB;
USE custom_DB;
```

#### CREATE TABLE
To create non-partitioned table use `CREARTE TABLE` statement:
```sql
CREATE TABLE testTable (
id BIGINT,
data STRING
) WITH (
'connector' = 'delta',
'table-path' = '<path-to-table>',
'<arbitrary-user-define-table-property' = '<value>',
'<delta.*-properties>' = '<value'>
);
```

To create a partitioned table, use `PARTITIONED BY`:
```sql
CREATE TABLE testTable (
id BIGINT,
data STRING,
part_a STRING,
part_b STRING
)
PARTITIONED BY (part_a, part_b);
WITH (
'connector' = 'delta',
'table-path' = '<path-to-table>',
'<arbitrary-user-define-table-property' = '<value>',
'<delta.*-properties>' = '<value'>
);
```

Delta connector supports all Flink's table schema [types](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#list-of-data-types).

Currently, we do not support computed and metadata columns, primary key and watermark definition in
`CREATE TABLE` statement.

The mandatory DDL options are:
+ `connector` that must be set to 'delta'
+ `table-path` path (filesystem, S3 etc.) of your Delta table.
If the table doesn't exist on the file system, it will be created for you.

Additionally, to the mandatory options, DDL for Delta table can accept other table properties.
These properties will be persisted into _delta_log for created table. However, they will not be used
by Delta connector during the processing.

Properties NOT allowed as table properties defined in DDL:
+ job-specific-properties like:
+ versionAsOf
+ timestampAsOf
+ startingTimestamp
+ mode
+ ignoreChanges
+ ignoreDeletes
+ Delta Standalone log store configurations such as `delta.logStore.*` properties
+ Parquet Format options such as `parquet.*`

##### Creating the Delta table
When executing `CREATE TABLE` for Delta connector, we can have two situations:
+ Delta table does not exist under `table-path`
+ Delta table already exists under `table-path`

In the first case, Delta Catalog will create Delta table folder and initialize
an empty (zero row) Delta table with schema defined in DDL. Additionally, all table properties defined in DDL
except `connector` and `table-path` will be added to Delta table metadata. On top of that a metastore entry
for new table will be created.

In the second case, where `_delta_log` already exists under specified `tabl-path`, Delta Catalog will throw an exception when:
+ DDL schema does not match `_delta_log` schema.
+ DDL partition definition does not match partition definition from `_delta_log`.
+ Table properties from DDL overrides existing table properties in `_delta_log`

If all above checks were passing, Delta Catalog will add metastore entry for the new table and will
add new table properties to the existing `_delta_log`.

#### CREATE TABLE LIKE
To create a table with the same schema, partitioning, and table properties as another table, use `CREATE TABLE LIKE`.

```sql
CREATE TABLE testTable (
id BIGINT,
data STRING
) WITH (
'connector' = 'delta',
'table-path' = '<path-to-table>'
);

CREATE TABLE likeTestTable
WITH (
'connector' = 'delta',
'table-path' = '%s'
) LIKE testTable;
```

#### ALTER TABLE
Delta connector only supports:
+ altering table name,
+ altering table property value,
+ adding new table property.

```sql
ALTER TABLE sourceTable SET ('userCustomProp'='myVal1')
ALTER TABLE sourceTable RENAME TO newSourceTable
```

#### DROP TABLE
To delete a table, run:
```sql
DROP TABLE sample;
```

This operation will remove ONLY the metastore entry. No Delta table files will be removed.

### Querying with SQL
#### SELECT query
Delta connector supports both batch (default) and streaming read for Flink jobs.
In order to run `SELECT` query in `batch` mode run:
```sql
SELECT * FROM testTable;
```
Above query will read all records from `testTable` and stop. It is suitable for `BATCH` Flink jobs.

In order to run `SELECT` query in `streaming` mode run:
```sql
SELECT * FROM testTable /*+ OPTIONS('mode' = 'streaming') */;
```
Above query will read all records from `testTable` and will keep monitoring underlying Delta table
for any new data (appends).

Both queries above will read all columns from Delta table. In order to specify subset of columns
that should be read, specify those columns in `SELECT` statement instead using `*` like so:
```sql
SELECT col1, col2, col3 FROM testTable;
```

For more details about Flink `SELECT` statement, please look at [Flink SELECT documentation](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/select/).
#### INSERT query
To append new data to the Delta table with a Flink job, use `INSERT INTO` statement.

Inserts three rows into Delta table called `sinkTable` and stops.
```sql
INSERT INTO sinkTable VALUES
('a', 'b', 1),
('c', 'd', 2),
('e', 'f', 3);
```

For examples below it is assumed that `sourceTable` refers to Delta table (Delta connector).
Inserts entire content of table called `sourceTable` into Delta table `sinkTable` and stop. The table schema's must match.
```sql
INSERT INTO sinkTable SELECT * FROM sourceTable;
```

Inserts entire data from `sourceTable` into Delta table `sinkTable` under static partition `region = europe` and stops.
```sql
INSERT INTO sinkTable PARTITION (region='europe') SELECT * FROM sourceTable;
```

Creates a continuous query that will insert entire content of table called `sourceTable` into Delta table `sinkTable` and will continuously monitor `sourceTable` for new data.
```sql
INSERT INTO sinkTable SELECT * FROM sourceTable /*+ OPTIONS('mode' = 'streaming') */;
```

### Hadoop Configuration
Delta Connector will resolve Flink cluster Hadoop configuration in order to use properties such as
Delta log store properties or Delta Catalog cache size.

For SQL jobs, Delta connector will resolve Flink cluster hadoop configuration in specify which takes higher/lower precedence:
+ `HADOOP_HOME` environment variable,
+ hdfs-default.xml pointed by deprecated flink config option `fs.hdfs.hdfsdefault` (deprecated),
+ `HADOOP_CONF_DIR` environment variable,
+ properties from Flink cluster config prefixed with `flink.hadoop`.

### SQL API Limitations
The Delta connector currently supports only Physical columns. The Metadata and Computed columns
are currently not supported. For details please see [here](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#columns).

Other unsupported features:
+ Watermark definition for CREATE TABLE statement.
+ Primary Key definition for CREATE TABLE statement.
+ Schema ALTER queries (create, drop column) including partitions columns.
+ Table and column comments.

## Usage

You can add the Flink/Delta Connector library as a dependency using your favorite build tool. Please note
that it expects the following packages to be provided:

DataStream API Only:
- `delta-standalone`
- `flink-parquet`
- `flink-table-common`
- `hadoop-client`

Additional libraries for Table/SQL API:
- `flink-clients`
- `flink-table-planner_2.12`

Additional libraries for AWS/S3 support
- enabling flink-s3-fs-hadoop plugin on Flink cluster [details here](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins)
- `hadoop-aws`

Please see the following build files for more details.

### Maven
Expand All @@ -331,7 +612,7 @@ Scala 2.12:
<project>
<properties>
<scala.main.version>2.12</scala.main.version>
<delta-connectors-version>0.6.0</delta-connectors-version>
<delta-connectors-version>0.6.1-SNAPSHOT</delta-connectors-version>
<flink-version>1.16.1</flink-version>
<hadoop-version>3.1.0</hadoop-version>
</properties>
Expand All @@ -357,24 +638,35 @@ Scala 2.12:
<artifactId>flink-parquet_${scala.main.version}</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.main.version}</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<!-- Needed for Flink Table/SQL API -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.main.version}</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
<!-- End needed for Flink Table/SQL API -->

<!-- Needed for AWS S3 support -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop-version}</version>
</dependency>
<!-- End needed for AWS S3 support -->
</dependencies>
</project>
```

Expand Down

0 comments on commit f382819

Please sign in to comment.