Skip to content

Commit

Permalink
[Fix][Connector-V2] Fix connector support SPI but without no args con…
Browse files Browse the repository at this point in the history
…structor apache#6551;目的是解决s3File到hive报错问题
  • Loading branch information
LeonYoah committed Mar 21, 2024
1 parent 409b3f7 commit 220e674
Show file tree
Hide file tree
Showing 22 changed files with 145 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -41,16 +40,13 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffsetFactory;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import javax.annotation.Nonnull;

import java.util.List;
import java.util.Optional;

@NoArgsConstructor
@AutoService(SeaTunnelSource.class)

public class MongodbIncrementalSource<T> extends IncrementalSource<T, MongodbSourceConfig>
implements SupportParallelism {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -42,14 +41,11 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import java.time.ZoneId;
import java.util.List;

@NoArgsConstructor
@AutoService(SeaTunnelSource.class)

public class MySqlIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig>
implements SupportParallelism {
static final String IDENTIFIER = "MySQL-CDC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -42,21 +41,17 @@

import org.apache.kafka.connect.data.Struct;

import com.google.auto.service.AutoService;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.TableChanges;
import lombok.NoArgsConstructor;

import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

@NoArgsConstructor
@AutoService(SeaTunnelSource.class)
public class PostgresIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig>
implements SupportParallelism {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -42,14 +41,10 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import java.time.ZoneId;
import java.util.List;

@NoArgsConstructor
@AutoService(SeaTunnelSource.class)
public class SqlServerIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig>
implements SupportParallelism {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@
import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplit;
import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplitEnumerator;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.List;

@Slf4j
@AutoService(SeaTunnelSource.class)
public class DorisSource
implements SeaTunnelSource<SeaTunnelRow, DorisSourceSplit, DorisSourceState> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;

import com.google.auto.service.AutoService;

import java.util.Optional;

import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;

@AutoService(SeaTunnelSink.class)
public class ElasticsearchSink
implements SeaTunnelSink<
SeaTunnelRow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.oss.config;
package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.oss.config;
package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.oss.exception;
package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.exception;

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,22 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.oss.sink;
package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.oss.exception.OssJindoConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssConf;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.exception.OssJindoConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;

import com.google.auto.service.AutoService;

@AutoService(SeaTunnelSink.class)
public class OssFileSink extends BaseFileSink {
@Override
public String getPluginName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.oss.sink;
package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssConfigOptions;

import com.google.auto.service.AutoService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.oss.source;
package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -33,9 +33,9 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.oss.exception.OssJindoConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssConf;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.exception.OssJindoConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.oss.source;
package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
Expand All @@ -25,7 +25,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssConfigOptions;

import com.google.auto.service.AutoService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.seatunnel.connectors.test;

import org.apache.seatunnel.connectors.seatunnel.file.oss.sink.OssFileSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.file.oss.source.OssFileSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.sink.OssFileSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.source.OssFileSourceFactory;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
Expand All @@ -39,13 +38,11 @@
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

import com.google.auto.service.AutoService;

import java.util.Optional;

import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;

@AutoService(SeaTunnelSink.class)
public class S3FileSink extends BaseMultipleTableFileSink implements SupportSaveMode {

private CatalogTable catalogTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig;

import com.google.auto.service.AutoService;

@AutoService(SeaTunnelSource.class)
@AutoService(Factory.class)
public class HudiSourceFactory implements TableSourceFactory {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,13 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;

import com.google.auto.service.AutoService;
import lombok.SneakyThrows;

import java.util.ArrayList;
import java.util.List;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;

@AutoService(SeaTunnelSource.class)
public class IcebergSource
implements SeaTunnelSource<
SeaTunnelRow, IcebergFileScanTaskSplit, IcebergSplitEnumeratorState>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.NODE_URLS;
import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig.USERNAME;

@AutoService(SeaTunnelSource.class)
public class StarRocksSource
implements SeaTunnelSource<SeaTunnelRow, StarRocksSourceSplit, StarRocksSourceState> {

Expand Down
Loading

0 comments on commit 220e674

Please sign in to comment.