Skip to content

Commit

Permalink
[Feature][Connector-V2] sftp file sink suport multiple table and save…
Browse files Browse the repository at this point in the history
… mode (#7668)
  • Loading branch information
chl-wxp authored Sep 29, 2024
1 parent fb89033 commit dc4b989
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 49 deletions.
44 changes: 44 additions & 0 deletions docs/en/connector-v2/sink/SftpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. |
| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. |
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |
| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method |
| data_save_mode | string | no | APPEND_DATA | Existing data processing method |

### host [string]

Expand Down Expand Up @@ -220,6 +222,19 @@ Support writing Parquet INT96 from a 12-byte field, only valid for parquet files
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.

### schema_save_mode [string]
Existing dir processing method.
- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist
- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist
- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist
- IGNORE :Ignore the treatment of the table

### data_save_mode [string]
Existing data processing method.
- DROP_DATA: preserve dir and delete data files
- APPEND_DATA: preserve dir, preserve data files
- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported

## Example

For text file format with `have_partition` and `custom_filename` and `sink_columns`
Expand Down Expand Up @@ -247,6 +262,35 @@ SftpFile {
is_enable_transaction = true
}

```

When our source end is multiple tables, and wants different expressions to different directory, we can configure this way

```hocon
SftpFile {
host = "xxx.xxx.xxx.xxx"
port = 22
user = "username"
password = "password"
path = "/data/sftp/seatunnel/job1/${table_name}"
tmp_path = "/data/sftp/seatunnel/tmp"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction = true
schema_save_mode=RECREATE_SCHEMA
data_save_mode=DROP_DATA
}
```

## Changelog
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.sftp.catalog;

import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;

public class SftpFileCatalog extends AbstractFileCatalog {

public SftpFileCatalog(
HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
super(hadoopFileSystemProxy, filePath, catalogName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.sftp.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class SftpFileCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
HadoopFileSystemProxy fileSystemUtils =
new HadoopFileSystemProxy(SftpConf.buildWithConfig(options));
return new SftpFileCatalog(
fileSystemUtils,
options.get(BaseSourceConfigOptions.FILE_PATH),
factoryIdentifier());
}

@Override
public String factoryIdentifier() {
return FileSystemType.SFTP.getFileSystemPluginName();
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.file.sftp.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

import java.util.HashMap;
Expand All @@ -42,20 +41,16 @@ public String getSchema() {
return SCHEMA;
}

public static HadoopConf buildWithConfig(Config config) {
String host = config.getString(SftpConfigOptions.SFTP_HOST.key());
int port = config.getInt(SftpConfigOptions.SFTP_PORT.key());
public static HadoopConf buildWithConfig(ReadonlyConfig config) {
String host = config.get(SftpConfigOptions.SFTP_HOST);
int port = config.get(SftpConfigOptions.SFTP_PORT);
String defaultFS = String.format("sftp://%s:%s", host, port);
HadoopConf hadoopConf = new SftpConf(defaultFS);
HashMap<String, String> sftpOptions = new HashMap<>();
sftpOptions.put("fs.sftp.user." + host, config.get(SftpConfigOptions.SFTP_USER));
sftpOptions.put(
"fs.sftp.user." + host, config.getString(SftpConfigOptions.SFTP_USER.key()));
sftpOptions.put(
"fs.sftp.password."
+ host
+ "."
+ config.getString(SftpConfigOptions.SFTP_USER.key()),
config.getString(SftpConfigOptions.SFTP_PASSWORD.key()));
"fs.sftp.password." + host + "." + config.get(SftpConfigOptions.SFTP_USER),
config.get(SftpConfigOptions.SFTP_PASSWORD));
hadoopConf.setExtraOptions(sftpOptions);
return hadoopConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,19 @@

package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

import com.google.auto.service.AutoService;
public class SftpFileSink extends BaseMultipleTableFileSink {
public SftpFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
super(SftpConf.buildWithConfig(readonlyConfig), readonlyConfig, catalogTable);
}

@AutoService(SeaTunnelSink.class)
public class SftpFileSink extends BaseFileSink {
@Override
public String getPluginName() {
return FileSystemType.SFTP.getFileSystemPluginName();
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
SftpConfigOptions.SFTP_HOST.key(),
SftpConfigOptions.SFTP_PORT.key(),
SftpConfigOptions.SFTP_USER.key(),
SftpConfigOptions.SFTP_PASSWORD.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
super.prepare(pluginConfig);
hadoopConf = SftpConf.buildWithConfig(pluginConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@

package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class SftpFileSinkFactory implements TableSinkFactory {
public class SftpFileSinkFactory extends BaseMultipleTableFileSinkFactory {
@Override
public String factoryIdentifier() {
return FileSystemType.SFTP.getFileSystemPluginName();
Expand All @@ -43,6 +52,9 @@ public OptionRule optionRule() {
.required(SftpConfigOptions.SFTP_USER)
.required(SftpConfigOptions.SFTP_PASSWORD)
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
.optional(BaseSinkConfig.DATA_SAVE_MODE)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
Expand Down Expand Up @@ -93,4 +105,12 @@ public OptionRule optionRule() {
.optional(BaseSinkConfig.TIME_FORMAT)
.build();
}

@Override
public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>
createSink(TableSinkFactoryContext context) {
ReadonlyConfig readonlyConfig = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
return () -> new SftpFileSink(readonlyConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"Sftp file source connector only support read [text, csv, json, xml] files");
}
String path = pluginConfig.getString(SftpConfigOptions.FILE_PATH.key());
hadoopConf = SftpConf.buildWithConfig(pluginConfig);
hadoopConf = SftpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(SftpConfigOptions.FILE_FORMAT_TYPE.key()));
Expand Down
Loading

0 comments on commit dc4b989

Please sign in to comment.