Skip to content

Commit

Permalink
[Feature][Connector-V2] Ftp file sink suport multiple table and save …
Browse files Browse the repository at this point in the history
…mode (#7665)
  • Loading branch information
chl-wxp authored Sep 14, 2024
1 parent 6da7491 commit 4f812e1
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 48 deletions.
42 changes: 42 additions & 0 deletions docs/en/connector-v2/sink/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. |
| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. |
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |
| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method |
| data_save_mode | string | no | APPEND_DATA | Existing data processing method |

### host [string]

Expand Down Expand Up @@ -227,6 +229,18 @@ Support writing Parquet INT96 from a 12-byte field, only valid for parquet files
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.

### schema_save_mode [string]
Existing dir processing method.
- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist
- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist
- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist
- IGNORE :Ignore the treatment of the table

### data_save_mode [string]
Existing data processing method.
- DROP_DATA: preserve dir and delete data files
- APPEND_DATA: preserve dir, preserve data files
- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported
## Example

For text file format simple config
Expand Down Expand Up @@ -273,6 +287,34 @@ FtpFile {

```

When our source end is multiple tables, and wants different expressions to different directory, we can configure this way

```hocon
FtpFile {
host = "xxx.xxx.xxx.xxx"
port = 21
user = "username"
password = "password"
path = "/data/ftp/seatunnel/job1/${table_name}"
tmp_path = "/data/ftp/seatunnel/tmp"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
sink_columns = ["name","age"]
filename_time_format = "yyyy.MM.dd"
schema_save_mode=RECREATE_SCHEMA
data_save_mode=DROP_DATA
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog;

import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;

public class FtpFileCatalog extends AbstractFileCatalog {

public FtpFileCatalog(
HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
super(hadoopFileSystemProxy, filePath, catalogName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class FtpFileCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
HadoopFileSystemProxy fileSystemUtils =
new HadoopFileSystemProxy(FtpConf.buildWithConfig(options));
return new FtpFileCatalog(
fileSystemUtils,
options.get(BaseSourceConfigOptions.FILE_PATH),
FileSystemType.FTP.getFileSystemPluginName());
}

@Override
public String factoryIdentifier() {
return FileSystemType.FTP.getFileSystemPluginName();
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@

package org.apache.seatunnel.connectors.seatunnel.file.ftp.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode;

import java.util.HashMap;
import java.util.Optional;

public class FtpConf extends HadoopConf {
private static final String HDFS_IMPL =
"org.apache.seatunnel.connectors.seatunnel.file.ftp.system.SeaTunnelFTPFileSystem";
private static final String SCHEMA = "ftp";

private FtpConf(String hdfsNameKey) {
public FtpConf(String hdfsNameKey) {
super(hdfsNameKey);
}

Expand All @@ -42,20 +43,20 @@ public String getSchema() {
return SCHEMA;
}

public static HadoopConf buildWithConfig(Config config) {
String host = config.getString(FtpConfigOptions.FTP_HOST.key());
int port = config.getInt(FtpConfigOptions.FTP_PORT.key());
public static HadoopConf buildWithConfig(ReadonlyConfig config) {
String host = config.get(FtpConfigOptions.FTP_HOST);
int port = config.get(FtpConfigOptions.FTP_PORT);
String defaultFS = String.format("ftp://%s:%s", host, port);
HadoopConf hadoopConf = new FtpConf(defaultFS);
HashMap<String, String> ftpOptions = new HashMap<>();
ftpOptions.put(
"fs.ftp.user." + host, config.getString(FtpConfigOptions.FTP_USERNAME.key()));
ftpOptions.put(
"fs.ftp.password." + host, config.getString(FtpConfigOptions.FTP_PASSWORD.key()));
if (config.hasPath(FtpConfigOptions.FTP_CONNECTION_MODE.key())) {
ftpOptions.put("fs.ftp.user." + host, config.get(FtpConfigOptions.FTP_USERNAME));
ftpOptions.put("fs.ftp.password." + host, config.get(FtpConfigOptions.FTP_PASSWORD));
Optional<FtpConnectionMode> optional =
config.getOptional(FtpConfigOptions.FTP_CONNECTION_MODE);
if (optional.isPresent()) {
ftpOptions.put(
"fs.ftp.connection.mode",
config.getString(FtpConfigOptions.FTP_CONNECTION_MODE.key()));
config.get(FtpConfigOptions.FTP_CONNECTION_MODE).toString());
}
hadoopConf.setExtraOptions(ftpOptions);
return hadoopConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,19 @@

package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

@AutoService(SeaTunnelSink.class)
public class FtpFileSink extends BaseFileSink {
public class FtpFileSink extends BaseMultipleTableFileSink {
@Override
public String getPluginName() {
return FileSystemType.FTP.getFileSystemPluginName();
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
FtpConfigOptions.FTP_HOST.key(),
FtpConfigOptions.FTP_PORT.key(),
FtpConfigOptions.FTP_USERNAME.key(),
FtpConfigOptions.FTP_PASSWORD.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
super.prepare(pluginConfig);
hadoopConf = FtpConf.buildWithConfig(pluginConfig);
public FtpFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
super(FtpConf.buildWithConfig(readonlyConfig), readonlyConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@

package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class FtpFileSinkFactory implements TableSinkFactory {
public class FtpFileSinkFactory extends BaseMultipleTableFileSinkFactory {
@Override
public String factoryIdentifier() {
return FileSystemType.FTP.getFileSystemPluginName();
Expand All @@ -42,7 +51,11 @@ public OptionRule optionRule() {
.required(FtpConfigOptions.FTP_PORT)
.required(FtpConfigOptions.FTP_USERNAME)
.required(FtpConfigOptions.FTP_PASSWORD)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.optional(BaseSinkConfig.TMP_PATH)
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
.optional(BaseSinkConfig.DATA_SAVE_MODE)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
Expand Down Expand Up @@ -94,4 +107,12 @@ public OptionRule optionRule() {
.optional(FtpConfigOptions.FTP_CONNECTION_MODE)
.build();
}

@Override
public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>
createSink(TableSinkFactoryContext context) {
ReadonlyConfig readonlyConfig = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
return () -> new FtpFileSink(readonlyConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"Ftp file source connector only support read [text, csv, json] files");
}
String path = pluginConfig.getString(FtpConfigOptions.FILE_PATH.key());
hadoopConf = FtpConf.buildWithConfig(pluginConfig);
hadoopConf = FtpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(FtpConfigOptions.FILE_FORMAT_TYPE.key()));
Expand Down
Loading

0 comments on commit 4f812e1

Please sign in to comment.