Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector] add get source method to all source connector #3846

Merged
merged 24 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
37004a3
add transform doc
EricJoy2048 Dec 22, 2022
854e4ba
add transform v2 document
EricJoy2048 Dec 22, 2022
16376cd
remove transform v1 from document
EricJoy2048 Dec 22, 2022
470a272
improve document
EricJoy2048 Dec 22, 2022
63b1647
fix dead link
EricJoy2048 Dec 22, 2022
1251b04
fix dead link
EricJoy2048 Dec 22, 2022
5fc6760
fix dead link
EricJoy2048 Dec 23, 2022
3d1a7a6
Merge remote-tracking branch 'apache/dev' into 47_add_transform-v2_doc
EricJoy2048 Dec 23, 2022
b733169
update supported connnector num
EricJoy2048 Dec 23, 2022
8de1d78
Update docs/en/transform-v2/replace.md
EricJoy2048 Dec 26, 2022
fe87ac0
Merge remote-tracking branch 'apache/dev' into 47_add_transform-v2_doc
EricJoy2048 Dec 26, 2022
c7661aa
Merge remote-tracking branch 'apache/dev' into 47_add_transform-v2_doc
EricJoy2048 Dec 26, 2022
c2414a0
Merge remote-tracking branch 'apache/dev' into 47_add_transform-v2_doc
EricJoy2048 Dec 28, 2022
af41489
Merge remote-tracking branch 'apache/dev' into 47_add_transform-v2_doc
EricJoy2048 Dec 28, 2022
879f14c
fix ci
EricJoy2048 Dec 28, 2022
dae1d0d
fix ci error
EricJoy2048 Dec 29, 2022
bb5a180
add Parallelism and SchemaProjection inteface to Source Connector
EricJoy2048 Dec 29, 2022
a9926eb
update schemaprojection to columnprojection
EricJoy2048 Dec 29, 2022
fdb0af1
fix code style
EricJoy2048 Dec 29, 2022
fb0dc95
tmp
EricJoy2048 Dec 30, 2022
657c869
all connector add getSourceClass method
EricJoy2048 Dec 30, 2022
3d7da96
merge from dev
EricJoy2048 Jan 2, 2023
2461acd
fix ci error
EricJoy2048 Jan 2, 2023
a1b24ac
fix ci error
EricJoy2048 Jan 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSource;
Expand Down Expand Up @@ -163,15 +164,19 @@ public static List<Factory> discoverFactories(ClassLoader classLoader) {
* This method is called by SeaTunnel Web to get the full option rule of a source.
* @return
*/
public static OptionRule sourceFullOptionRule(@NonNull Factory factory) {
public static OptionRule sourceFullOptionRule(@NonNull TableSourceFactory factory) {
OptionRule sourceOptionRule = factory.optionRule();
if (sourceOptionRule == null) {
throw new FactoryException("sourceOptionRule can not be null");
}

OptionRule sourceCommonOptionRule =
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
Class<? extends SeaTunnelSource> sourceClass = factory.getSourceClass();
if (sourceClass.isAssignableFrom(SupportParallelism.class)) {
OptionRule sourceCommonOptionRule =
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
}

return sourceOptionRule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.connector.TableSource;

Expand All @@ -36,4 +37,6 @@ default <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource
TableFactoryContext context) {
throw new UnsupportedOperationException("unsupported now");
}

Class<? extends SeaTunnelSource> getSourceClass();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand All @@ -42,4 +43,9 @@ public OptionRule optionRule() {
return OptionRule.builder()
.required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE, SeaTunnelSchema.SCHEMA).build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return AmazonDynamoDBSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
Expand Down Expand Up @@ -51,4 +52,9 @@ public OptionRule optionRule() {
JdbcSourceOptions.CONNECTION_POOL_SIZE)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return MySqlIncrementalSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;

Expand All @@ -40,4 +41,9 @@ public String factoryIdentifier() {
public OptionRule optionRule() {
return OptionRule.builder().required(HOST, DATABASE, SQL, USERNAME, PASSWORD).build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return ClickhouseSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SOURCE;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand All @@ -44,4 +45,9 @@ public OptionRule optionRule() {
return OptionRule.builder().required(HOSTS, INDEX).optional(USERNAME, PASSWORD, SCROLL_TIME, SCROLL_SIZE)
.exclusive(SOURCE, SeaTunnelSchema.SCHEMA).build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return ElasticsearchSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_LENGTH;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand All @@ -44,4 +45,9 @@ public OptionRule optionRule() {
return OptionRule.builder().required(SeaTunnelSchema.SCHEMA).optional(ROW_NUM, SPLIT_NUM, SPLIT_READ_INTERVAL, MAP_SIZE,
ARRAY_SIZE, BYTES_LENGTH, STRING_LENGTH).build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return FakeSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.ftp.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return FtpFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -52,4 +53,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return HdfsFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -51,4 +52,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return LocalFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.oss.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return OssFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.oss.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return OssFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.s3.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -58,4 +59,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return S3FileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.sftp.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return SftpFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.google.sheets.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand All @@ -42,4 +43,9 @@ public OptionRule optionRule() {
.optional(SeaTunnelSchema.SCHEMA)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return SheetsSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.hive.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
Expand All @@ -38,4 +39,9 @@ public OptionRule optionRule() {
.required(HiveConfig.METASTORE_URI)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return HiveSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.http.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule.Builder getHttpBuilder() {
.optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
.optional(HttpConfig.RETRY_BACKOFF_MAX_MS);
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return HttpSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public OptionRule optionRule() {
.conditional(HudiSourceConfig.USE_KERBEROS, true, HudiSourceConfig.KERBEROS_PRINCIPAL, HudiSourceConfig.KERBEROS_PRINCIPAL_FILE)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return HudiSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_USE_SNAPSHOT_TIMESTAMP;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;

Expand Down Expand Up @@ -70,4 +71,9 @@ public OptionRule optionRule() {
)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return IcebergSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class InfluxDBSinkFactory implements TableSourceFactory {
public class InfluxDBSinkFactory implements TableSinkFactory {

@Override
public String factoryIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.UPPER_BOUND;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;

Expand Down Expand Up @@ -60,4 +61,9 @@ public OptionRule optionRule() {
)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return InfluxDBSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.VERSION;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;

Expand All @@ -53,4 +54,9 @@ public OptionRule optionRule() {
ENABLE_CACHE_LEADER, VERSION, LOWER_BOUND, UPPER_BOUND, NUM_PARTITIONS)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return IoTDBSource.class;
}
}
Loading