Skip to content

Commit

Permalink
- Adding information about configuring Flink to work with Hive Cluster.
Browse files Browse the repository at this point in the history
Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
  • Loading branch information
kristoffSC authored and scottsand-db committed Jun 26, 2023
1 parent 9c7fde5 commit ed7304b
Showing 1 changed file with 72 additions and 8 deletions.
80 changes: 72 additions & 8 deletions connectors/flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@ Depending on the version of the connector you can use it with following Apache F
| 0.4.x (Sink Only) | 1.12.0 <= X <= 1.14.5 |
| 0.5.0 | 1.13.0 <= X <= 1.13.6 |
| 0.6.0 | X >= 1.15.3 |
| 0.7.0 | X >= 1.16.1 |

### APIs

See the [Java API docs](https://delta-io.github.io/connectors/latest/delta-flink/api/java/index.html) here.

### Known limitations

- The current version only supports Flink `Datastream` API. Support for Flink Table API / SQL, along with Flink Catalog's implementation for storing Delta table's metadata in an external metastore, are planned to be added in a future release.
- For GCP Object Storage, the current version only supports reading. Writing to GCP Object Storage is not supported. This is due to Flink not supporting recoverable writes to GCS, which was added in Flink [1.15](https://issues.apache.org/jira/browse/FLINK-11838).
- For Azure Blob Storage, the current version only supports reading. Writing to Azure Blob Storage is not supported by Flink due to [issue](https://issues.apache.org/jira/browse/FLINK-17444) with class shading
and will probably be added along with [Azure Data Lake Store Gen 2 support](https://issues.apache.org/jira/browse/FLINK-18568).
- For AWS S3 storage, in order to ensure concurrent transactional writes from different clusters, use [multi-cluster configuration guidelines](https://docs.delta.io/latest/delta-storage.html#multi-cluster-setup). Please see [example](#3-sink-creation-with-multi-cluster-support-for-delta-standalone) for how to use this configuration in Flink Delta Sink.
Expand Down Expand Up @@ -379,7 +378,9 @@ The following properties can be set:
+ `in-memory`- a default value if no other specified. Will use Flink's In-Memory catalog as decorated catalog.
+ `hive` - Use Flink's Hive catalog as decorated catalog.

Any extra defined property will be passed to the decorated catalog. For example, in order to create
Any extra defined property will be passed to the decorated catalog.

#### Hive Metastore
Delta Catalog backed by Hive catalog and use Hive's catalog `hadoop-conf-dir` option call the below query:
```sql
CREATE CATALOG <catalog_name> WITH (
Expand All @@ -390,6 +391,70 @@ CREATE CATALOG <catalog_name> WITH (
USE CATALOG <catalog_name>;
```

The logic for resolving configuration from `hadoop-conf-dir` depends on [Flink Hive Catalog implementation](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/hive_catalog/).
It is expected by Flink Hive catalog, that `hadoop-conf-dir` will contain at least one of the files:
- core-site.xml
- hdfs-site.xml
- yarn-site.xml
- mapred-site.xml

The exact list of properties that have to be included in the configuration files depends on your
Hive metastore endpoint/server. The minimum configuration that can be stored in `core-site.xml` file is presented below:
```xml
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hive-metastore:9083</value>
<description>IP address (or fully-qualified domain name) and port of the metastore host</description>
</property>
</configuration>
```
The `hive-metastore` should be resolved to IP address of hive metastore service.

In order to use Hive Catalog with Flink cluster, an additional [Flink cluster configuration](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/overview/#dependencies)
is required. in a nutshell, it is required to add `flink-sql-connector-hive-x-x-x.jar` file to Flink's lib folder and
provide Hadoop dependencies, by setting the HADOOP_CLASSPATH environment variable:
`export HADOOP_CLASSPATH='hadoop classpath'`.

Our connector was tested with:
- `flink-sql-connector-hive-2.3.9_2.12-1.16.0.jar`
- Hive 2.3.2 metastore
- Hadoop 3.3.2

It is recommended to use Hadoop version 3.3.2. When using version prior to 3.3.2 we have encountered many issues regarding incompatible
class definitions between Flink fs, Flink Hive connector and Delta Standalone.
For this moment, no tests were conducted for Hadoop version > 3.3.2.

Examples of issues caused by incompatible Hadoop version (`HADOOP_CLASSPATH` env or hadoop dependency in Flink job pom.xml)
while deploying Flink Delta SQL jobs:

```
Caused by: java.lang.IllegalArgumentException:
Cannot invoke org.apache.commons.configuration2.AbstractConfiguration.setListDelimiterHandler on bean class
'class org.apache.commons.configuration2.PropertiesConfiguration' - argument type mismatch -
had objects of type "org.apache.commons.configuration2.convert.DefaultListDelimiterHandler"
but expected signature "org.apache.commons.configuration2.convert.ListDelimiterHandler"
```

```
Caused by: java.lang.LinkageError: loader constraint violation: when resolving method 'void org.apache.hadoop.util.SemaphoredDelegatingExecutor.(com.google.common.util.concurrent.ListeningExecutorService, int, boolean)' the class loader org.apache.flink.util.ChildFirstClassLoader @2486925f of the current class, org/apache/hadoop/fs/s3a/S3AFileSystem, and the class loader 'app' for the method's defining class, org/apache/hadoop/util/SemaphoredDelegatingExecutor, have different Class objects for the type com/google/common/util/concurrent/ListeningExecutorService used in the signature (org.apache.hadoop.fs.s3a.S3AFileSystem is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2486925f, parent loader 'app'; org.apache.hadoop.util.SemaphoredDelegatingExecutor is in unnamed module of loader 'app')
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
at io.delta.storage.S3SingleDriverLogStore.write(S3SingleDriverLogStore.java:299)
```

```
java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.<init>(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
at io.delta.storage.S3SingleDriverLogStore.write(S3SingleDriverLogStore.java:299)
at io.delta.standalone.internal.storage.DelegatingLogStore.write(DelegatingLogStore.scala:91)
```

#### Delta Catalog Table Cache
As a performance optimization, the Delta Catalog automatically caches Delta tables,
since these tables can be expensive to recompute.
Expand Down Expand Up @@ -610,17 +675,16 @@ Additional libraries for AWS/S3 support
- enabling flink-s3-fs-hadoop plugin on Flink cluster [details here](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins)
- `hadoop-aws`

Please see the following build files for more details.

### Maven
Please see the following build files for more details.

Scala 2.12:
#### Scala 2.12:

```xml
<project>
<properties>
<scala.main.version>2.12</scala.main.version>
<delta-connectors-version>0.6.1-SNAPSHOT</delta-connectors-version>
<delta-connectors-version>0.7.0</delta-connectors-version>
<flink-version>1.16.1</flink-version>
<hadoop-version>3.1.0</hadoop-version>
</properties>
Expand Down Expand Up @@ -678,7 +742,7 @@ Scala 2.12:
</project>
```

### SBT
#### SBT

Please replace the versions of the dependencies with the ones you are using.

Expand Down

0 comments on commit ed7304b

Please sign in to comment.