Skip to content

Commit

Permalink
[Bug] [Connector-V2] Clickhouse File Connector failed to sink to tabl…
Browse files Browse the repository at this point in the history
…e with settings like storage_policy (#4172)
  • Loading branch information
wineternity authored Mar 6, 2023
1 parent df82b77 commit e120dc4
Showing 1 changed file with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -62,6 +63,8 @@ public class ClickhouseFileSinkWriter
private static final String CK_LOCAL_CONFIG_TEMPLATE =
"<yandex><path> %s </path> <users><default><password/> <profile>default</profile> <quota>default</quota>"
+ "<access_management>1</access_management></default></users><profiles><default/></profiles><quotas><default/></quotas></yandex>";
private static final String CLICKHOUSE_SETTINGS_KEY = "SETTINGS";
private static final String CLICKHOUSE_DDL_SETTING_FILTER = "storage_policy";
private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX = "/local_data.log";
private static final int UUID_LENGTH = 10;
private final FileReaderOption readerOption;
Expand All @@ -74,6 +77,7 @@ public class ClickhouseFileSinkWriter
private final Map<Shard, String> shardTempFile;

private final SinkWriter.Context context;
private final ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();

public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Context context) {
this.readerOption = readerOption;
Expand Down Expand Up @@ -263,10 +267,7 @@ private List<String> generateClickhouseLocalFiles(String clickhouseLocalFileTmpF
command.add(
String.format(
"\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"",
clickhouseTable
.getCreateTableDDL()
.replace(clickhouseTable.getDatabase() + ".", "")
.replaceAll("`", ""),
adjustClickhouseDDL(),
clickhouseTable.getLocalTableName(),
readerOption.getTableSchema().keySet().stream()
.map(
Expand Down Expand Up @@ -364,8 +365,9 @@ private void moveClickhouseLocalFileToServer(Shard shard, List<String> clickhous
FileTransferFactory.createFileTransfer(
this.readerOption.getCopyMethod(), hostAddress, user, password);
fileTransfer.init();
int randomPath = threadLocalRandom.nextInt(shardLocalDataPaths.get(shard).size());
fileTransfer.transferAndChown(
clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) + "detached/");
clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(randomPath) + "detached/");
fileTransfer.close();
}

Expand All @@ -386,4 +388,28 @@ private void clearLocalFileDirectory(List<String> clickhouseLocalFiles) {
e);
}
}

private String adjustClickhouseDDL() {
String createTableDDL =
clickhouseTable
.getCreateTableDDL()
.replace(clickhouseTable.getDatabase() + ".", "")
.replaceAll("`", "");
if (createTableDDL.contains(CLICKHOUSE_SETTINGS_KEY)) {
List<String> filters =
Arrays.stream(CLICKHOUSE_DDL_SETTING_FILTER.split(","))
.collect(Collectors.toList());
int p = createTableDDL.indexOf(CLICKHOUSE_SETTINGS_KEY);
String filteredSetting =
Arrays.stream(
createTableDDL
.substring(p + CLICKHOUSE_SETTINGS_KEY.length())
.split(","))
.filter(e -> !filters.contains(e.split("=")[0].trim()))
.collect(Collectors.joining(","));
createTableDDL =
createTableDDL.substring(0, p) + CLICKHOUSE_SETTINGS_KEY + filteredSetting;
}
return createTableDDL;
}
}

0 comments on commit e120dc4

Please sign in to comment.