diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java new file mode 100644 index 000000000000..cb718c5f2d65 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class OssJindoConnectorException extends SeaTunnelRuntimeException { + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java index dc7dc1bdc43c..22e78bc59eda 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.oss.sink; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -25,6 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; +import org.apache.seatunnel.connectors.seatunnel.file.oss.exception.OssJindoConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -35,7 +37,7 @@ public class OssFileSink extends BaseFileSink { @Override public String getPluginName() { - return FileSystemType.OSS.getFileSystemPluginName(); + return FileSystemType.OSS_JINDO.getFileSystemPluginName(); } @Override @@ -46,7 +48,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(), OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key()); if (!result.isSuccess()) { - throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg()); + throw new OssJindoConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, result.getMsg())); } hadoopConf = OssConf.buildWithConfig(pluginConfig); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java index d8a98cd0cdfe..456be0a810d3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.oss.source; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; @@ -29,6 +30,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; +import org.apache.seatunnel.connectors.seatunnel.file.oss.exception.OssJindoConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; @@ -42,7 +44,7 @@ public class OssFileSource extends BaseFileSource { @Override public String getPluginName() { - return FileSystemType.OSS.getFileSystemPluginName(); + return FileSystemType.OSS_JINDO.getFileSystemPluginName(); } @Override @@ -52,7 +54,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(), OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key()); if (!result.isSuccess()) { - throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + throw new OssJindoConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, result.getMsg())); } readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key())); readStrategy.setPluginConfig(pluginConfig);