Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-v2] Support S3 filesystem of paimon connector #8036

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/en/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ All `changelog-producer` modes are currently supported. The default is `none`.
> note:
> When you use a streaming mode to read paimon table,different mode will produce [different results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。

## Filesystems
The Paimon connector supports writing data to multiple file systems. Currently, the supported file systems are hdfs and s3.
If you use the s3 filesystem. You can configure the `fs.s3a.access-key`、`fs.s3a.secret-key`、`fs.s3a.endpoint`、`fs.s3a.path.style.access`、`fs.s3a.aws.credentials.provider` properties in the `paimon.hadoop.conf` option.
Besides, the warehouse should start with `s3a://`.



## Examples

### Single table
Expand Down Expand Up @@ -94,6 +101,53 @@ sink {
}
```

### Single table with s3 filesystem

```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}

sink {
Paimon {
warehouse = "s3a://test/"
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
fs.s3a.access-key=G52pnxg67819khOZ9ezX
fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
fs.s3a.endpoint="http://minio4:9000"
fs.s3a.path.style.access=true
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
```

### Single table(Specify hadoop HA config and kerberos config)

```hocon
Expand Down
32 changes: 32 additions & 0 deletions docs/en/connector-v2/source/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ Properties in hadoop conf

The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files

## Filesystems
The Paimon connector supports writing data to multiple file systems. Currently, the supported file systems are hdfs and s3.
If you use the s3 filesystem. You can configure the `fs.s3a.access-key`、`fs.s3a.secret-key`、`fs.s3a.endpoint`、`fs.s3a.path.style.access`、`fs.s3a.aws.credentials.provider` properties in the `paimon.hadoop.conf` option.
Besides, the warehouse should start with `s3a://`.

## Examples

### Simple example
Expand Down Expand Up @@ -109,6 +114,33 @@ source {
}
```

### S3 example
```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
Paimon {
warehouse = "s3a://test/"
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
fs.s3a.access-key=G52pnxg67819khOZ9ezX
fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
fs.s3a.endpoint="http://minio4:9000"
fs.s3a.path.style.access=true
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}

sink {
Console{}
}
```

### Hadoop conf example

```hocon
Expand Down
54 changes: 53 additions & 1 deletion docs/zh/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ Paimon表的changelog产生模式有[四种](https://paimon.apache.org/docs/mast
* [`lookup`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup)
* [`full-compaction`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#full-compaction)
> 注意:
> 当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
> 当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。

## 文件系统
Paimon连接器支持向多文件系统写入数据。目前支持的文件系统有hdfs和s3。
如果您使用s3文件系统。您可以配置`fs.s3a.access-key `, `fs.s3a.secret-key`, `fs.s3a.endpoint`, `fs.s3a.path.style.access`, `fs.s3a.aws.credentials`。在`paimon.hadoop.conf`选项中设置提供程序的属性。
除此之外,warehouse应该以`s3a://`开头。

## 示例

Expand Down Expand Up @@ -93,6 +98,53 @@ sink {
}
```

### 单表(基于S3文件系统)

```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}

sink {
Paimon {
warehouse = "s3a://test/"
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
fs.s3a.access-key=G52pnxg67819khOZ9ezX
fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
fs.s3a.endpoint="http://minio4:9000"
fs.s3a.path.style.access=true
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
```

### 单表(指定hadoop HA配置和kerberos配置)

```hocon
Expand Down
34 changes: 34 additions & 0 deletions seatunnel-connectors-v2/connector-paimon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<properties>
<paimon.version>0.7.0-incubating</paimon.version>
<hive.version>2.3.9</hive.version>
<connector.name>connector.paimon</connector.name>
</properties>

<dependencies>
Expand All @@ -47,6 +48,12 @@
<version>${paimon.version}</version>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-s3-impl</artifactId>
<version>${paimon.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-guava</artifactId>
Expand Down Expand Up @@ -98,4 +105,31 @@

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<filters>
<filter>
<artifact>org.apache.paimon:paimon-s3-impl</artifact>
<excludes>
<exclude>org/apache/hadoop/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class PaimonCatalogLoader implements Serializable {
private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";

private static final String HDFS_PREFIX = "hdfs://";
private static final String S3A_PREFIX = "s3a://";
/** ********* Hdfs constants ************* */
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";

Expand All @@ -63,20 +64,20 @@ public PaimonCatalogLoader(PaimonConfig paimonConfig) {
}

public Catalog loadCatalog() {
// When using the seatunel engine, set the current class loader to prevent loading failures
// When using the seatunnel engine, set the current class loader to prevent loading failures
Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader());
final Map<String, String> optionsMap = new HashMap<>(1);
optionsMap.put(CatalogOptions.WAREHOUSE.key(), warehouse);
optionsMap.put(CatalogOptions.METASTORE.key(), catalogType.getType());
if (warehouse.startsWith(HDFS_PREFIX)) {
checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME);
paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL);
} else if (warehouse.startsWith(S3A_PREFIX)) {
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
}
if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) {
optionsMap.put(CatalogOptions.URI.key(), catalogUri);
paimonHadoopConfiguration
.getPropsWithPrefix(StringUtils.EMPTY)
.forEach((k, v) -> optionsMap.put(k, v));
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
}
final Options options = Options.fromMap(optionsMap);
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.filesystem;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOLoader;
import org.apache.paimon.fs.Path;
import org.apache.paimon.s3.S3FileIO;

import java.util.ArrayList;
import java.util.List;

public class S3Loader implements FileIOLoader {
@Override
public String getScheme() {
return "s3a";
dailai marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public List<String[]> requiredOptions() {
List<String[]> options = new ArrayList<>();
options.add(new String[] {"fs.s3a.access-key", "fs.s3a.access.key"});
options.add(new String[] {"fs.s3a.secret-key", "fs.s3a.secret.key"});
options.add(new String[] {"fs.s3a.endpoint", "fs.s3a.endpoint"});
return options;
}

@Override
public FileIO load(Path path) {
return new S3FileIO();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.seatunnel.connectors.seatunnel.paimon.filesystem.S3Loader
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,32 @@
<artifactId>connector-paimon-e2e</artifactId>
<name>SeaTunnel : E2E : Connector V2 : Paimon</name>

<properties>
<testcontainer.version>1.19.1</testcontainer.version>
<minio.version>8.5.6</minio.version>
</properties>

<dependencies>
<!-- minio containers -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-paimon</artifactId>
<artifactId>connector-seatunnel-e2e-base</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -44,6 +59,18 @@
<classifier>optional</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-paimon</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
Expand Down
Loading
Loading