Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] sftp file sink suport multiple table and save mode #7668

Merged
merged 6 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/en/connector-v2/sink/SftpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. |
| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. |
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |
| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method |
| data_save_mode | string | no | APPEND_DATA | Existing data processing method |

### host [string]

Expand Down Expand Up @@ -220,6 +222,19 @@ Support writing Parquet INT96 from a 12-byte field, only valid for parquet files
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.

### schema_save_mode [string]
Existing dir processing method.
- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist
- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist
- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist
- IGNORE :Ignore the treatment of the table

### data_save_mode [string]
Existing data processing method.
- DROP_DATA: preserve dir and delete data files
- APPEND_DATA: preserve dir, preserve data files
- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported

## Example

For text file format with `have_partition` and `custom_filename` and `sink_columns`
Expand Down Expand Up @@ -247,6 +262,35 @@ SftpFile {
is_enable_transaction = true
}

```

When our source end is multiple tables, and wants different expressions to different directory, we can configure this way

```hocon
SftpFile {
host = "xxx.xxx.xxx.xxx"
port = 22
user = "username"
password = "password"
path = "/data/sftp/seatunnel/job1/${table_name}"
tmp_path = "/data/sftp/seatunnel/tmp"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction = true
schema_save_mode=RECREATE_SCHEMA
data_save_mode=DROP_DATA
}


```

## Changelog
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.sftp.catalog;

import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;

public class SftpFileCatalog extends AbstractFileCatalog {

public SftpFileCatalog(
HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
super(hadoopFileSystemProxy, filePath, catalogName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.sftp.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class SftpFileCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
HadoopFileSystemProxy fileSystemUtils =
new HadoopFileSystemProxy(SftpConf.buildWithConfig(options));
return new SftpFileCatalog(
fileSystemUtils,
options.get(BaseSourceConfigOptions.FILE_PATH),
factoryIdentifier());
}

@Override
public String factoryIdentifier() {
return FileSystemType.SFTP.getFileSystemPluginName();
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.file.sftp.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

import java.util.HashMap;
Expand All @@ -42,20 +41,16 @@ public String getSchema() {
return SCHEMA;
}

public static HadoopConf buildWithConfig(Config config) {
String host = config.getString(SftpConfigOptions.SFTP_HOST.key());
int port = config.getInt(SftpConfigOptions.SFTP_PORT.key());
public static HadoopConf buildWithConfig(ReadonlyConfig config) {
String host = config.get(SftpConfigOptions.SFTP_HOST);
int port = config.get(SftpConfigOptions.SFTP_PORT);
String defaultFS = String.format("sftp://%s:%s", host, port);
HadoopConf hadoopConf = new SftpConf(defaultFS);
HashMap<String, String> sftpOptions = new HashMap<>();
sftpOptions.put("fs.sftp.user." + host, config.get(SftpConfigOptions.SFTP_USER));
sftpOptions.put(
"fs.sftp.user." + host, config.getString(SftpConfigOptions.SFTP_USER.key()));
sftpOptions.put(
"fs.sftp.password."
+ host
+ "."
+ config.getString(SftpConfigOptions.SFTP_USER.key()),
config.getString(SftpConfigOptions.SFTP_PASSWORD.key()));
"fs.sftp.password." + host + "." + config.get(SftpConfigOptions.SFTP_USER),
config.get(SftpConfigOptions.SFTP_PASSWORD));
hadoopConf.setExtraOptions(sftpOptions);
return hadoopConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,19 @@

package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

import com.google.auto.service.AutoService;
public class SftpFileSink extends BaseMultipleTableFileSink {
public SftpFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
super(SftpConf.buildWithConfig(readonlyConfig), readonlyConfig, catalogTable);
}

@AutoService(SeaTunnelSink.class)
public class SftpFileSink extends BaseFileSink {
@Override
public String getPluginName() {
return FileSystemType.SFTP.getFileSystemPluginName();
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
SftpConfigOptions.SFTP_HOST.key(),
SftpConfigOptions.SFTP_PORT.key(),
SftpConfigOptions.SFTP_USER.key(),
SftpConfigOptions.SFTP_PASSWORD.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
super.prepare(pluginConfig);
hadoopConf = SftpConf.buildWithConfig(pluginConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@

package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class SftpFileSinkFactory implements TableSinkFactory {
public class SftpFileSinkFactory extends BaseMultipleTableFileSinkFactory {
@Override
public String factoryIdentifier() {
return FileSystemType.SFTP.getFileSystemPluginName();
Expand All @@ -43,6 +52,9 @@ public OptionRule optionRule() {
.required(SftpConfigOptions.SFTP_USER)
.required(SftpConfigOptions.SFTP_PASSWORD)
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
.optional(BaseSinkConfig.DATA_SAVE_MODE)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
Expand Down Expand Up @@ -93,4 +105,12 @@ public OptionRule optionRule() {
.optional(BaseSinkConfig.TIME_FORMAT)
.build();
}

@Override
public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>
createSink(TableSinkFactoryContext context) {
ReadonlyConfig readonlyConfig = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
return () -> new SftpFileSink(readonlyConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"Sftp file source connector only support read [text, csv, json, xml] files");
}
String path = pluginConfig.getString(SftpConfigOptions.FILE_PATH.key());
hadoopConf = SftpConf.buildWithConfig(pluginConfig);
hadoopConf = SftpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(SftpConfigOptions.FILE_FORMAT_TYPE.key()));
Expand Down
Loading
Loading