From daa61d0eb62543e0b051f500f6d53cbb72ee1695 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Fri, 6 Sep 2024 22:57:18 +0800 Subject: [PATCH 01/10] [feature] ftp file support save-mode --- .../seatunnel/file/ftp/sink/FtpFileSink.java | 35 ++++--------------- .../file/ftp/sink/FtpFileSinkFactory.java | 20 +++++++++-- 2 files changed, 24 insertions(+), 31 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java index 031d442f207..34b2b7b88fd 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java @@ -17,46 +17,23 @@ package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SeaTunnelSink; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf; -import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions; -import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink; import com.google.auto.service.AutoService; @AutoService(SeaTunnelSink.class) -public class FtpFileSink extends BaseFileSink { +public class FtpFileSink extends BaseMultipleTableFileSink { @Override public String getPluginName() { return FileSystemType.FTP.getFileSystemPluginName(); } - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, - FtpConfigOptions.FTP_HOST.key(), - FtpConfigOptions.FTP_PORT.key(), - FtpConfigOptions.FTP_USERNAME.key(), - FtpConfigOptions.FTP_PASSWORD.key()); - if (!result.isSuccess()) { - throw new FileConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - super.prepare(pluginConfig); - hadoopConf = FtpConf.buildWithConfig(pluginConfig); + public FtpFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + super(FtpConf.buildWithConfig(), readonlyConfig, catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index 24a9ed48f87..67633d971e7 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -17,18 +17,26 @@ package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory; import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; import com.google.auto.service.AutoService; @AutoService(Factory.class) -public class FtpFileSinkFactory implements TableSinkFactory { +public class FtpFileSinkFactory extends BaseMultipleTableFileSinkFactory { @Override public String factoryIdentifier() { return FileSystemType.FTP.getFileSystemPluginName(); @@ -94,4 +102,12 @@ public OptionRule optionRule() { .optional(FtpConfigOptions.FTP_CONNECTION_MODE) .build(); } + + @Override + public TableSink + createSink(TableSinkFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new FtpFileSink(readonlyConfig, catalogTable); + } } From 714d8c99544b13169d1830b771af5aafa446cdca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Tue, 10 Sep 2024 18:41:10 +0800 Subject: [PATCH 02/10] [feature] ftp file support save-mode 2 --- .../file/ftp/catalog/FtpFileCatalog.java | 29 +++ .../ftp/catalog/FtpFileCatalogFactory.java | 53 +++++ .../seatunnel/file/ftp/config/FtpConf.java | 25 +-- .../seatunnel/file/ftp/sink/FtpFileSink.java | 2 +- .../file/ftp/sink/FtpFileSinkFactory.java | 2 + .../file/ftp/source/FtpFileSource.java | 3 +- .../file/ftp/FtpFileMultipleSinkIT.java | 187 ++++++++++++++++++ .../multiple_table_fake_to_ftp_file_text.conf | 105 ++++++++++ .../seatunnel/jdbc/AbstractJdbcIT.java | 7 + .../resources/examples/fake_to_console.conf | 84 ++++++-- 10 files changed, 470 insertions(+), 27 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileMultipleSinkIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java new file mode 100644 index 00000000000..2bf0bf49e5e --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog; + +import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; + +public class FtpFileCatalog extends AbstractFileCatalog { + + public FtpFileCatalog( + HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) { + super(hadoopFileSystemProxy, filePath, catalogName); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java new file mode 100644 index 00000000000..74f05c12d7d --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class FtpFileCatalogFactory implements CatalogFactory { + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + HadoopFileSystemProxy fileSystemUtils = + new HadoopFileSystemProxy(FtpConf.buildWithConfig(options)); + return new FtpFileCatalog( + fileSystemUtils, + options.get(BaseSourceConfigOptions.FILE_PATH), + FileSystemType.FTP.getFileSystemPluginName()); + } + + @Override + public String factoryIdentifier() { + return FileSystemType.FTP.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().build(); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java index 9186e1d8ee9..bd98800c540 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java @@ -17,18 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.file.ftp.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode; import java.util.HashMap; +import java.util.Optional; public class FtpConf extends HadoopConf { private static final String HDFS_IMPL = "org.apache.seatunnel.connectors.seatunnel.file.ftp.system.SeaTunnelFTPFileSystem"; private static final String SCHEMA = "ftp"; - private FtpConf(String hdfsNameKey) { + public FtpConf(String hdfsNameKey) { super(hdfsNameKey); } @@ -42,20 +43,20 @@ public String getSchema() { return SCHEMA; } - public static HadoopConf buildWithConfig(Config config) { - String host = config.getString(FtpConfigOptions.FTP_HOST.key()); - int port = config.getInt(FtpConfigOptions.FTP_PORT.key()); + public static HadoopConf buildWithConfig(ReadonlyConfig config) { + String host = config.get(FtpConfigOptions.FTP_HOST); + int port = config.get(FtpConfigOptions.FTP_PORT); String defaultFS = String.format("ftp://%s:%s", host, port); HadoopConf hadoopConf = new FtpConf(defaultFS); HashMap ftpOptions = new HashMap<>(); - ftpOptions.put( - "fs.ftp.user." + host, config.getString(FtpConfigOptions.FTP_USERNAME.key())); - ftpOptions.put( - "fs.ftp.password." + host, config.getString(FtpConfigOptions.FTP_PASSWORD.key())); - if (config.hasPath(FtpConfigOptions.FTP_CONNECTION_MODE.key())) { + ftpOptions.put("fs.ftp.user." + host, config.get(FtpConfigOptions.FTP_USERNAME)); + ftpOptions.put("fs.ftp.password." + host, config.get(FtpConfigOptions.FTP_PASSWORD)); + Optional optional = + config.getOptional(FtpConfigOptions.FTP_CONNECTION_MODE); + if (optional.isPresent()) { ftpOptions.put( "fs.ftp.connection.mode", - config.getString(FtpConfigOptions.FTP_CONNECTION_MODE.key())); + config.get(FtpConfigOptions.FTP_CONNECTION_MODE).toString()); } hadoopConf.setExtraOptions(ftpOptions); return hadoopConf; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java index 34b2b7b88fd..afc5978f49b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java @@ -34,6 +34,6 @@ public String getPluginName() { } public FtpFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { - super(FtpConf.buildWithConfig(), readonlyConfig, catalogTable); + super(FtpConf.buildWithConfig(readonlyConfig), readonlyConfig, catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index 67633d971e7..8ced39feb28 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -51,6 +51,8 @@ public OptionRule optionRule() { .required(FtpConfigOptions.FTP_USERNAME) .required(FtpConfigOptions.FTP_PASSWORD) .optional(BaseSinkConfig.FILE_FORMAT_TYPE) + .optional(BaseSinkConfig.SCHEMA_SAVE_MODE) + .optional(BaseSinkConfig.DATA_SAVE_MODE) .conditional( BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java index b032717cabe..d6f0f64abb6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; @@ -78,7 +79,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "Ftp file source connector only support read [text, csv, json] files"); } String path = pluginConfig.getString(FtpConfigOptions.FILE_PATH.key()); - hadoopConf = FtpConf.buildWithConfig(pluginConfig); + hadoopConf = FtpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig)); readStrategy = ReadStrategyFactory.of( pluginConfig.getString(FtpConfigOptions.FILE_FORMAT_TYPE.key())); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileMultipleSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileMultipleSinkIT.java new file mode 100644 index 00000000000..9628a69a0c8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileMultipleSinkIT.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.file.ftp; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog.FtpFileCatalog; +import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.container.TestHelper; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.apache.commons.lang3.StringUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback; + +import com.github.dockerjava.api.command.ExecCreateCmdResponse; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = + "1.The apache-compress version is not compatible with apache-poi. 2.Spark Engine is not compatible with commons-net") +@Slf4j +public class FtpFileMultipleSinkIT extends TestSuiteBase implements TestResource { + + private static final String FTP_IMAGE = "fauria/vsftpd:latest"; + + private static final String ftp_CONTAINER_HOST = "ftp"; + + private static final int FTP_PORT = 21; + + private static final String USERNAME = "seatunnel"; + + private static final String PASSWORD = "pass"; + + private GenericContainer ftpContainer; + + @BeforeAll + @Override + public void startUp() throws Exception { + ftpContainer = + new GenericContainer<>(FTP_IMAGE) + .withExposedPorts(FTP_PORT) + .withNetwork(NETWORK) + .withExposedPorts(FTP_PORT) + .withNetworkAliases(ftp_CONTAINER_HOST) + .withEnv("FILE_OPEN_MODE", "0666") + .withEnv("WRITE_ENABLE", "YES") + .withEnv("ALLOW_WRITEABLE_CHROOT", "YES") + .withEnv("ANONYMOUS_ENABLE", "YES") + .withEnv("LOCAL_ENABLE", "YES") + .withEnv("LOCAL_UMASK", "000") + .withEnv("FTP_USER", USERNAME) + .withEnv("FTP_PASS", PASSWORD) + .withEnv("PASV_ADDRESS", "0.0.0.0") + .withLogConsumer(new Slf4jLogConsumer(log)) + .withPrivilegedMode(true); + + ftpContainer.setPortBindings(Collections.singletonList("21:21")); + ftpContainer.start(); + Startables.deepStart(Stream.of(ftpContainer)).join(); + log.info("ftp container started"); + + ftpContainer.execInContainer("sh", "-c", "chmod -R 777 /home/vsftpd/seatunnel/"); + ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp /home/vsftpd/seatunnel/"); + } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK, EngineType.SPARK}, + disabledReason = + "Fink test is multi-node, LocalFile connector will use different containers for obtaining files") + public void testFtpFileMultipleWriteWithSaveMode(TestContainer container) + throws IOException, InterruptedException { + TestHelper helper = new TestHelper(container); + // test save_mode + String path1 = "/tmp/seatunnel/text/source_1"; + String path2 = "/tmp/seatunnel/text/source_2"; + Assertions.assertEquals(getFileListFromContainer(path1).size(), 0); + Assertions.assertEquals(getFileListFromContainer(path2).size(), 0); + helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf"); + Assertions.assertEquals(getFileListFromContainer(path1).size(), 1); + Assertions.assertEquals(getFileListFromContainer(path2).size(), 1); + helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf"); + Assertions.assertEquals(getFileListFromContainer(path1).size(), 1); + Assertions.assertEquals(getFileListFromContainer(path2).size(), 1); + } + + @SneakyThrows + private List getFileListFromContainer(String path) { + String command = "ls -1 " + path; + ExecCreateCmdResponse execCreateCmdResponse = + dockerClient + .execCreateCmd(ftpContainer.getContainerId()) + .withCmd("sh", "-c", command) + .withAttachStdout(true) + .withAttachStderr(true) + .exec(); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + dockerClient + .execStartCmd(execCreateCmdResponse.getId()) + .exec(new ExecStartResultCallback(outputStream, System.err)) + .awaitCompletion(); + + String output = new String(outputStream.toByteArray(), StandardCharsets.UTF_8).trim(); + List fileList = new ArrayList<>(); + log.info("container path file list is :{}", output); + String[] files = output.split("\n"); + for (String file : files) { + if (StringUtils.isNotEmpty(file)) { + log.info("container path file name is :{}", file); + fileList.add(file); + } + } + return fileList; + } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK, EngineType.SPARK}, + disabledReason = + "Fink test is multi-node, LocalFile connector will use different containers for obtaining files") + public void testLocalFileCatalog(TestContainer container) + throws IOException, InterruptedException { + String defaultFS = String.format("ftp://%s:%s", ftp_CONTAINER_HOST, FTP_PORT); + final FtpFileCatalog ftpFileCatalog = + new FtpFileCatalog( + new HadoopFileSystemProxy(new FtpConf(defaultFS)), + "/tmp/seatunnel/json/test1", + FileSystemType.FTP.getFileSystemPluginName()); + final TablePath tablePath = TablePath.DEFAULT; + Assertions.assertFalse(ftpFileCatalog.tableExists(tablePath)); + ftpFileCatalog.createTable(null, null, false); + Assertions.assertTrue(ftpFileCatalog.tableExists(tablePath)); + Assertions.assertFalse(ftpFileCatalog.isExistsData(tablePath)); + ftpFileCatalog.dropTable(tablePath, false); + Assertions.assertFalse(ftpFileCatalog.tableExists(tablePath)); + } + + @AfterAll + @Override + public void tearDown() { + if (ftpContainer != null) { + ftpContainer.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf new file mode 100644 index 00000000000..77319bfd53a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + result_table_name = "ftp" + tables_configs = [ + { + schema = { + table = "source_1" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] + } + ] + }, + { + schema = { + table = "source_2" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] + } + ] + } + ] + } +} + +transform { +} + +sink { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + path = "/tmp/seatunnel/text/${table_name}" + source_table_name = "ftp" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + "schema_save_mode"="RECREATE_SCHEMA" + "data_save_mode"="DROP_DATA" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 920b26e1793..406b496bcbb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -38,7 +38,9 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.apache.commons.lang3.StringUtils; @@ -344,6 +346,11 @@ public void tearDown() throws SQLException { } @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK, EngineType.SPARK}, + disabledReason = + "Fink test is multi-node, LocalFile connector will use different containers for obtaining files") public void testJdbcDb(TestContainer container) throws IOException, InterruptedException, SQLException { List configFiles = jdbcCase.getConfigFile(); diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf index ec7871359d4..db9393bc170 100644 --- a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf @@ -14,26 +14,69 @@ # See the License for the specific language governing permissions and # limitations under the License. # -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### env { parallelism = 1 job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local } source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { - result_table_name = "fake" - parallelism = 1 - schema = { - fields { - name = "string" - age = "int" + result_table_name = "ftp" + tables_configs = [ + { + schema = { + table = "source_1" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] + } + ] + }, + { + schema = { + table = "source_2" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] + } + ] } - } + ] } } @@ -41,7 +84,22 @@ transform { } sink { - console { - source_table_name="fake" + FtpFile { + host = "172.20.123.145" + port = 21 + user = "wenjun_ruan" + password = "QWer12#$" + path = "/home/wenjun_ruan/laowang/0908/${table_name}" + source_table_name = "ftp" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + "schema_save_mode"="RECREATE_SCHEMA" + "data_save_mode"="DROP_DATA" } } \ No newline at end of file From d8341a62ead09e00830439cf355b76e6a27df375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sat, 14 Sep 2024 14:49:02 +0800 Subject: [PATCH 03/10] [feature] ftp file support save-mode 3 --- docs/en/connector-v2/sink/FtpFile.md | 43 +++- .../file/ftp/sink/FtpFileSinkFactory.java | 1 + .../e2e/connector/file/ftp/FtpFileIT.java | 69 +++++++ .../file/ftp/FtpFileMultipleSinkIT.java | 187 ------------------ .../multiple_table_fake_to_ftp_file_text.conf | 2 +- ...ultiple_table_fake_to_ftp_file_text_2.conf | 105 ++++++++++ 6 files changed, 218 insertions(+), 189 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileMultipleSinkIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index 9305aa7e990..248f40c5fe5 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -64,6 +64,8 @@ By default, we use 2PC commit to ensure `exactly-once` | parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | | parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | | encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method | +| data_save_mode | string | no | APPEND_DATA | Existing data processing method | ### host [string] @@ -83,7 +85,7 @@ The target ftp password is required ### path [string] -The target dir path is required. +The target dir path is required, you can inject the upstream CatalogTable into the path by using: `${database_name}`, `${table_name}` and `${schema_name}`. ### connection_mode [string] @@ -227,6 +229,18 @@ Support writing Parquet INT96 from a 12-byte field, only valid for parquet files Only used when file_format_type is json,text,csv,xml. The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`. +### schema_save_mode [string] +Existing dir processing method. +- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist +- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist +- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist +- IGNORE :Ignore the treatment of the table + +### data_save_mode [string] +Existing data processing method. +- DROP_DATA: preserve dir and delete data files +- APPEND_DATA: preserve dir, preserve data files +- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported ## Example For text file format simple config @@ -273,6 +287,33 @@ FtpFile { ``` +When our source end is multiple tables, and wants different expressions to different directory, we can configure this way +```bash + +FtpFile { + host = "xxx.xxx.xxx.xxx" + port = 21 + user = "username" + password = "password" + path = "/data/ftp/seatunnel/job1/${table_name}" + tmp_path = "/data/ftp/seatunnel/tmp" + file_format_type = "text" + field_delimiter = "\t" + row_delimiter = "\n" + have_partition = true + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + custom_filename = true + file_name_expression = "${transactionId}_${now}" + sink_columns = ["name","age"] + filename_time_format = "yyyy.MM.dd" + schema_save_mode=RECREATE_SCHEMA + data_save_mode=DROP_DATA +} + +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index 8ced39feb28..4c0de969704 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -50,6 +50,7 @@ public OptionRule optionRule() { .required(FtpConfigOptions.FTP_PORT) .required(FtpConfigOptions.FTP_USERNAME) .required(FtpConfigOptions.FTP_PASSWORD) + .optional(BaseSinkConfig.TMP_PATH) .optional(BaseSinkConfig.FILE_FORMAT_TYPE) .optional(BaseSinkConfig.SCHEMA_SAVE_MODE) .optional(BaseSinkConfig.DATA_SAVE_MODE) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java index 2a1598bf32a..d4287e1007d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java @@ -25,17 +25,27 @@ import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.util.ContainerUtil; +import org.apache.commons.lang3.StringUtils; + import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback; +import com.github.dockerjava.api.command.ExecCreateCmdResponse; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.stream.Stream; @DisabledOnContainer( @@ -141,6 +151,65 @@ public void testFtpFileReadAndWrite(TestContainer container) helper.execute("/orc/fake_to_ftp_file_orc.conf"); // test write ftp root path excel file helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf"); + // test mult table and save_mode + testMultipleTableAndSaveMode(helper); + } + + @SneakyThrows + private void testMultipleTableAndSaveMode(TestHelper helper) { + // test mult table and save_mode:RECREATE_SCHEMA DROP_DATA + String homePath = "/home/vsftpd/seatunnel"; + String path1 = "/tmp/seatunnel_mult/text/source_1"; + String path2 = "/tmp/seatunnel_mult/text/source_2"; + Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 0); + Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 0); + helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 1); + helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 1); + // test mult table and save_mode:CREATE_SCHEMA_WHEN_NOT_EXIST APPEND_DATA + String path3 = "/tmp/seatunnel_mult2/text/source_1"; + String path4 = "/tmp/seatunnel_mult2/text/source_2"; + Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 0); + Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 0); + helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 1); + helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 2); + Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 2); + } + + @SneakyThrows + private List getFileListFromContainer(String path) { + String command = "ls -1 " + path; + ExecCreateCmdResponse execCreateCmdResponse = + dockerClient + .execCreateCmd(ftpContainer.getContainerId()) + .withCmd("sh", "-c", command) + .withAttachStdout(true) + .withAttachStderr(true) + .exec(); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + dockerClient + .execStartCmd(execCreateCmdResponse.getId()) + .exec(new ExecStartResultCallback(outputStream, System.err)) + .awaitCompletion(); + + String output = new String(outputStream.toByteArray(), StandardCharsets.UTF_8).trim(); + List fileList = new ArrayList<>(); + log.info("container path file list is :{}", output); + String[] files = output.split("\n"); + for (String file : files) { + if (StringUtils.isNotEmpty(file)) { + log.info("container path file name is :{}", file); + fileList.add(file); + } + } + return fileList; } @AfterAll diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileMultipleSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileMultipleSinkIT.java deleted file mode 100644 index 9628a69a0c8..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileMultipleSinkIT.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.connector.file.ftp; - -import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog.FtpFileCatalog; -import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf; -import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; -import org.apache.seatunnel.e2e.common.TestResource; -import org.apache.seatunnel.e2e.common.TestSuiteBase; -import org.apache.seatunnel.e2e.common.container.EngineType; -import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.container.TestHelper; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; - -import org.apache.commons.lang3.StringUtils; - -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestTemplate; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback; - -import com.github.dockerjava.api.command.ExecCreateCmdResponse; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Stream; - -@DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = - "1.The apache-compress version is not compatible with apache-poi. 2.Spark Engine is not compatible with commons-net") -@Slf4j -public class FtpFileMultipleSinkIT extends TestSuiteBase implements TestResource { - - private static final String FTP_IMAGE = "fauria/vsftpd:latest"; - - private static final String ftp_CONTAINER_HOST = "ftp"; - - private static final int FTP_PORT = 21; - - private static final String USERNAME = "seatunnel"; - - private static final String PASSWORD = "pass"; - - private GenericContainer ftpContainer; - - @BeforeAll - @Override - public void startUp() throws Exception { - ftpContainer = - new GenericContainer<>(FTP_IMAGE) - .withExposedPorts(FTP_PORT) - .withNetwork(NETWORK) - .withExposedPorts(FTP_PORT) - .withNetworkAliases(ftp_CONTAINER_HOST) - .withEnv("FILE_OPEN_MODE", "0666") - .withEnv("WRITE_ENABLE", "YES") - .withEnv("ALLOW_WRITEABLE_CHROOT", "YES") - .withEnv("ANONYMOUS_ENABLE", "YES") - .withEnv("LOCAL_ENABLE", "YES") - .withEnv("LOCAL_UMASK", "000") - .withEnv("FTP_USER", USERNAME) - .withEnv("FTP_PASS", PASSWORD) - .withEnv("PASV_ADDRESS", "0.0.0.0") - .withLogConsumer(new Slf4jLogConsumer(log)) - .withPrivilegedMode(true); - - ftpContainer.setPortBindings(Collections.singletonList("21:21")); - ftpContainer.start(); - Startables.deepStart(Stream.of(ftpContainer)).join(); - log.info("ftp container started"); - - ftpContainer.execInContainer("sh", "-c", "chmod -R 777 /home/vsftpd/seatunnel/"); - ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp /home/vsftpd/seatunnel/"); - } - - @TestTemplate - @DisabledOnContainer( - value = {}, - type = {EngineType.FLINK, EngineType.SPARK}, - disabledReason = - "Fink test is multi-node, LocalFile connector will use different containers for obtaining files") - public void testFtpFileMultipleWriteWithSaveMode(TestContainer container) - throws IOException, InterruptedException { - TestHelper helper = new TestHelper(container); - // test save_mode - String path1 = "/tmp/seatunnel/text/source_1"; - String path2 = "/tmp/seatunnel/text/source_2"; - Assertions.assertEquals(getFileListFromContainer(path1).size(), 0); - Assertions.assertEquals(getFileListFromContainer(path2).size(), 0); - helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf"); - Assertions.assertEquals(getFileListFromContainer(path1).size(), 1); - Assertions.assertEquals(getFileListFromContainer(path2).size(), 1); - helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf"); - Assertions.assertEquals(getFileListFromContainer(path1).size(), 1); - Assertions.assertEquals(getFileListFromContainer(path2).size(), 1); - } - - @SneakyThrows - private List getFileListFromContainer(String path) { - String command = "ls -1 " + path; - ExecCreateCmdResponse execCreateCmdResponse = - dockerClient - .execCreateCmd(ftpContainer.getContainerId()) - .withCmd("sh", "-c", command) - .withAttachStdout(true) - .withAttachStderr(true) - .exec(); - - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - dockerClient - .execStartCmd(execCreateCmdResponse.getId()) - .exec(new ExecStartResultCallback(outputStream, System.err)) - .awaitCompletion(); - - String output = new String(outputStream.toByteArray(), StandardCharsets.UTF_8).trim(); - List fileList = new ArrayList<>(); - log.info("container path file list is :{}", output); - String[] files = output.split("\n"); - for (String file : files) { - if (StringUtils.isNotEmpty(file)) { - log.info("container path file name is :{}", file); - fileList.add(file); - } - } - return fileList; - } - - @TestTemplate - @DisabledOnContainer( - value = {}, - type = {EngineType.FLINK, EngineType.SPARK}, - disabledReason = - "Fink test is multi-node, LocalFile connector will use different containers for obtaining files") - public void testLocalFileCatalog(TestContainer container) - throws IOException, InterruptedException { - String defaultFS = String.format("ftp://%s:%s", ftp_CONTAINER_HOST, FTP_PORT); - final FtpFileCatalog ftpFileCatalog = - new FtpFileCatalog( - new HadoopFileSystemProxy(new FtpConf(defaultFS)), - "/tmp/seatunnel/json/test1", - FileSystemType.FTP.getFileSystemPluginName()); - final TablePath tablePath = TablePath.DEFAULT; - Assertions.assertFalse(ftpFileCatalog.tableExists(tablePath)); - ftpFileCatalog.createTable(null, null, false); - Assertions.assertTrue(ftpFileCatalog.tableExists(tablePath)); - Assertions.assertFalse(ftpFileCatalog.isExistsData(tablePath)); - ftpFileCatalog.dropTable(tablePath, false); - Assertions.assertFalse(ftpFileCatalog.tableExists(tablePath)); - } - - @AfterAll - @Override - public void tearDown() { - if (ftpContainer != null) { - ftpContainer.close(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf index 77319bfd53a..cd28e543990 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf @@ -89,7 +89,7 @@ sink { port = 21 user = seatunnel password = pass - path = "/tmp/seatunnel/text/${table_name}" + path = "/tmp/seatunnel_mult/text/${table_name}" source_table_name = "ftp" row_delimiter = "\n" partition_dir_expression = "${k0}=${v0}" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf new file mode 100644 index 00000000000..e05a14ab86e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + result_table_name = "ftp" + tables_configs = [ + { + schema = { + table = "source_1" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] + } + ] + }, + { + schema = { + table = "source_2" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] + } + ] + } + ] + } +} + +transform { +} + +sink { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + path = "/tmp/seatunnel_mult2/text/${table_name}" + source_table_name = "ftp" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST" + "data_save_mode"="APPEND_DATA" + } +} \ No newline at end of file From c4fdbe1c1a6e16d4eb65415135f783f036abf646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sat, 14 Sep 2024 14:58:10 +0800 Subject: [PATCH 04/10] [feature] ftp file support save-mode 4 --- .../resources/examples/fake_to_console.conf | 84 +++---------------- 1 file changed, 13 insertions(+), 71 deletions(-) diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf index db9393bc170..ec7871359d4 100644 --- a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf @@ -14,69 +14,26 @@ # See the License for the specific language governing permissions and # limitations under the License. # +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### env { parallelism = 1 job.mode = "BATCH" - - # You can set spark configuration here - spark.app.name = "SeaTunnel" - spark.executor.instances = 1 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local } source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { - result_table_name = "ftp" - tables_configs = [ - { - schema = { - table = "source_1" - fields { - id = int - val_bool = boolean - val_tinyint = tinyint - val_smallint = smallint - val_int = int - val_bigint = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] - } - ] - }, - { - schema = { - table = "source_2" - fields { - id = int - val_bool = boolean - val_tinyint = tinyint - val_smallint = smallint - val_int = int - val_bigint = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] - } - ] + result_table_name = "fake" + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" } - ] + } } } @@ -84,22 +41,7 @@ transform { } sink { - FtpFile { - host = "172.20.123.145" - port = 21 - user = "wenjun_ruan" - password = "QWer12#$" - path = "/home/wenjun_ruan/laowang/0908/${table_name}" - source_table_name = "ftp" - row_delimiter = "\n" - partition_dir_expression = "${k0}=${v0}" - is_partition_field_write_in_file = true - file_name_expression = "${transactionId}_${now}" - file_format_type = "text" - filename_time_format = "yyyy.MM.dd" - is_enable_transaction = true - compress_codec = "lzo" - "schema_save_mode"="RECREATE_SCHEMA" - "data_save_mode"="DROP_DATA" + console { + source_table_name="fake" } } \ No newline at end of file From 4d7fc2b664ae55610791034dbc257e0eea942145 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sat, 14 Sep 2024 15:00:47 +0800 Subject: [PATCH 05/10] [feature] ftp file support save-mode 5 --- .../connectors/seatunnel/jdbc/AbstractJdbcIT.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 317459875f5..75b0ac484d6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -38,9 +38,7 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.apache.commons.lang3.StringUtils; @@ -174,7 +172,7 @@ protected void initializeJdbcConnection(String jdbcUrl) protected void insertTestData() { try (PreparedStatement preparedStatement = - connection.prepareStatement(jdbcCase.getInsertSql())) { + connection.prepareStatement(jdbcCase.getInsertSql())) { List rows = jdbcCase.getTestData().getValue(); @@ -346,11 +344,6 @@ public void tearDown() throws SQLException { } @TestTemplate - @DisabledOnContainer( - value = {}, - type = {EngineType.FLINK, EngineType.SPARK}, - disabledReason = - "Fink test is multi-node, LocalFile connector will use different containers for obtaining files") public void testJdbcDb(TestContainer container) throws IOException, InterruptedException, SQLException { List configFiles = jdbcCase.getConfigFile(); From b8b725925f2d835df122cc5cebd3cffdda2a97a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sat, 14 Sep 2024 15:02:30 +0800 Subject: [PATCH 06/10] [feature] ftp file support save-mode 6 --- .../seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 75b0ac484d6..6bc135dd464 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -172,7 +172,7 @@ protected void initializeJdbcConnection(String jdbcUrl) protected void insertTestData() { try (PreparedStatement preparedStatement = - connection.prepareStatement(jdbcCase.getInsertSql())) { + connection.prepareStatement(jdbcCase.getInsertSql())) { List rows = jdbcCase.getTestData().getValue(); From bd251083c1812c37107c62d6e97488ad8ca3e7d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sat, 14 Sep 2024 16:05:30 +0800 Subject: [PATCH 07/10] [feature] ftp file support save-mode 7 --- .../seatunnel/e2e/connector/file/ftp/FtpFileIT.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java index d4287e1007d..7e155d428c1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java @@ -151,12 +151,17 @@ public void testFtpFileReadAndWrite(TestContainer container) helper.execute("/orc/fake_to_ftp_file_orc.conf"); // test write ftp root path excel file helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf"); - // test mult table and save_mode - testMultipleTableAndSaveMode(helper); } - @SneakyThrows - private void testMultipleTableAndSaveMode(TestHelper helper) { + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK}, + disabledReason = + "Fink test is multi-node, FtpFile connector will use different containers for obtaining files") + public void testMultipleTableAndSaveMode(TestContainer container) + throws IOException, InterruptedException { + TestHelper helper = new TestHelper(container); // test mult table and save_mode:RECREATE_SCHEMA DROP_DATA String homePath = "/home/vsftpd/seatunnel"; String path1 = "/tmp/seatunnel_mult/text/source_1"; From 1dfa392b47bf45e5be63feba70614f39ff0f3b8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sat, 14 Sep 2024 16:32:14 +0800 Subject: [PATCH 08/10] [feature] ftp file support save-mode 8 --- .../connectors/seatunnel/file/ftp/sink/FtpFileSink.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java index afc5978f49b..f4b271e0356 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java @@ -18,15 +18,11 @@ package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf; import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink; -import com.google.auto.service.AutoService; - -@AutoService(SeaTunnelSink.class) public class FtpFileSink extends BaseMultipleTableFileSink { @Override public String getPluginName() { From 89474f9aa7fd84a1429632e21d1c7404c5e7d501 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sat, 14 Sep 2024 17:00:40 +0800 Subject: [PATCH 09/10] [feature] ftp file support save-mode 8 --- .../connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index 4c0de969704..cfd2351a5c7 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -50,6 +51,7 @@ public OptionRule optionRule() { .required(FtpConfigOptions.FTP_PORT) .required(FtpConfigOptions.FTP_USERNAME) .required(FtpConfigOptions.FTP_PASSWORD) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .optional(BaseSinkConfig.TMP_PATH) .optional(BaseSinkConfig.FILE_FORMAT_TYPE) .optional(BaseSinkConfig.SCHEMA_SAVE_MODE) From 00f91f54c6a6028aee857cb8e06277680d7d8667 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sat, 14 Sep 2024 19:32:58 +0800 Subject: [PATCH 10/10] [feature] ftp file support save-mode 9 --- docs/en/connector-v2/sink/FtpFile.md | 5 +++-- .../apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index 248f40c5fe5..5b927bda126 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -85,7 +85,7 @@ The target ftp password is required ### path [string] -The target dir path is required, you can inject the upstream CatalogTable into the path by using: `${database_name}`, `${table_name}` and `${schema_name}`. +The target dir path is required. ### connection_mode [string] @@ -288,7 +288,8 @@ FtpFile { ``` When our source end is multiple tables, and wants different expressions to different directory, we can configure this way -```bash + +```hocon FtpFile { host = "xxx.xxx.xxx.xxx" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java index 7e155d428c1..1b89a0bcc7c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java @@ -157,8 +157,7 @@ public void testFtpFileReadAndWrite(TestContainer container) @DisabledOnContainer( value = {}, type = {EngineType.FLINK}, - disabledReason = - "Fink test is multi-node, FtpFile connector will use different containers for obtaining files") + disabledReason = "Flink dosen't support multi-table at now") public void testMultipleTableAndSaveMode(TestContainer container) throws IOException, InterruptedException { TestHelper helper = new TestHelper(container);