Skip to content

Commit

Permalink
spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Dec 12, 2023
1 parent 60165de commit 645e9d6
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
try (Statement stmt = conn.createStatement()) {
stmt.execute(query);
} catch (SQLException e) {
throw new CatalogException(String.format("drop table [%s] failed", tablePath.getFullName()), e);
throw new CatalogException(
String.format("drop table [%s] failed", tablePath.getFullName()), e);
}
}

Expand Down Expand Up @@ -327,16 +328,17 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
}

@Override
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
if (!tableExists(tablePath) && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
String query = DorisCatalogUtil.getTruncateTableQuery(tablePath);
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(query);
} catch (SQLException e) {
throw new CatalogException(String.format("truncate table [%s] failed", tablePath.getFullName()),
e);
throw new CatalogException(
String.format("truncate table [%s] failed", tablePath.getFullName()), e);
}
}

Expand All @@ -350,8 +352,8 @@ public boolean isExistsData(TablePath tablePath) {
return size > 0;
}
} catch (SQLException e) {
throw new CatalogException(String.format("get table [%s] data size failed", tablePath.getFullName()),
e);
throw new CatalogException(
String.format("get table [%s] data size failed", tablePath.getFullName()), e);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.seatunnel.connectors.doris.config;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.io.Serializable;
import java.util.HashMap;
Expand Down Expand Up @@ -176,5 +178,4 @@ public ReadonlyConfig getCatalogConfig() {
map.put(SAVE_MODE_CREATE_TEMPLATE.key(), createTableTemplate);
return ReadonlyConfig.fromMap(map);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.seatunnel.connectors.doris.config;

import org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap;

import java.util.Map;

Expand Down Expand Up @@ -235,10 +236,7 @@ public interface DorisOptions {
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST);

Option<String> CUSTOM_SQL =
Options.key("custom_sql")
.stringType()
.noDefaultValue();
Option<String> CUSTOM_SQL = Options.key("custom_sql").stringType().noDefaultValue();

OptionRule.Builder SINK_RULE =
OptionRule.builder().required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.seatunnel.connectors.doris.sink;

import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
Expand Down Expand Up @@ -49,7 +49,10 @@
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkStateSerializer;
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.commons.lang3.StringUtils;

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -177,15 +180,16 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
}

DorisCatalogFactory factory = new DorisCatalogFactory();
DorisCatalog catalog = (DorisCatalog) factory.createCatalog("doris", dorisConfig.getCatalogConfig());

return Optional.of(new DorisSaveModeHandler(
dorisConfig.getSchemaSaveMode(),
dorisConfig.getDataSaveMode(),
catalog,
catalogTable.getTableId().toTablePath(),
catalogTable,
dorisConfig.getCustomSql())
);
DorisCatalog catalog =
(DorisCatalog) factory.createCatalog("doris", dorisConfig.getCatalogConfig());

return Optional.of(
new DorisSaveModeHandler(
dorisConfig.getSchemaSaveMode(),
dorisConfig.getDataSaveMode(),
catalog,
catalogTable.getTableId().toTablePath(),
catalogTable,
dorisConfig.getCustomSql()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ public class DorisSaveModeHandler implements SaveModeHandler {

private String customSql;

public DorisSaveModeHandler(SchemaSaveMode schemaSaveMode, DataSaveMode dataSaveMode, DorisCatalog catalog,
TablePath tablePath, CatalogTable catalogTable, String customSql) {
public DorisSaveModeHandler(
SchemaSaveMode schemaSaveMode,
DataSaveMode dataSaveMode,
DorisCatalog catalog,
TablePath tablePath,
CatalogTable catalogTable,
String customSql) {
this.schemaSaveMode = schemaSaveMode;
this.dataSaveMode = dataSaveMode;
this.catalog = catalog;
Expand All @@ -65,14 +70,16 @@ public void handleSchemaSaveMode() {
break;
case ERROR_WHEN_SCHEMA_NOT_EXIST:
if (!catalog.tableExists(tablePath)) {
String msg = String.format("Table [%s] is not exists.", tablePath.getFullName());
throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.SINK_TABLE_NOT_EXIST, msg);
String msg =
String.format("Table [%s] is not exists.", tablePath.getFullName());
throw new SeaTunnelRuntimeException(
SeaTunnelAPIErrorCode.SINK_TABLE_NOT_EXIST, msg);
}
break;
default:
throw new IllegalArgumentException(String.format("Unsupported schema save mode: %s", schemaSaveMode));
throw new IllegalArgumentException(
String.format("Unsupported schema save mode: %s", schemaSaveMode));
}

}

@Override
Expand All @@ -88,21 +95,19 @@ public void handleDataSaveMode() {
case ERROR_WHEN_DATA_EXISTS:
if (catalog.isExistsData(tablePath)) {
String msg = String.format("Table [%s] has data", tablePath.getFullName());
throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.SOURCE_ALREADY_HAS_DATA, msg);
throw new SeaTunnelRuntimeException(
SeaTunnelAPIErrorCode.SOURCE_ALREADY_HAS_DATA, msg);
}
break;
case CUSTOM_PROCESSING:
catalog.executeSql(tablePath, customSql);
break;
default:
throw new IllegalArgumentException(String.format("Unsupported data save mode: %s", dataSaveMode));
throw new IllegalArgumentException(
String.format("Unsupported data save mode: %s", dataSaveMode));
}

}

@Override
public void close() throws Exception {

}

public void close() throws Exception {}
}

0 comments on commit 645e9d6

Please sign in to comment.