Skip to content

Commit

Permalink
[Feature][Connector] add get source method to all source connector (#…
Browse files Browse the repository at this point in the history
…3846)

* add transform doc

* add transform v2 document

* remove transform v1 from document

* improve document

* fix dead link

* fix dead link

* fix dead link

* update supported connnector num

* Update docs/en/transform-v2/replace.md

Co-authored-by: Zongwen Li <zongwen.li.tech@gmail.com>

* fix ci

* fix ci error

* add Parallelism and SchemaProjection inteface to Source Connector

* update schemaprojection to columnprojection

* fix code style

* tmp

* all connector add getSourceClass method

* fix ci error

* fix ci error

Co-authored-by: Zongwen Li <zongwen.li.tech@gmail.com>
  • Loading branch information
EricJoy2048 and ashulin authored Jan 3, 2023
1 parent 1118c83 commit 417178f
Show file tree
Hide file tree
Showing 35 changed files with 197 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSource;
Expand Down Expand Up @@ -163,15 +164,19 @@ public static List<Factory> discoverFactories(ClassLoader classLoader) {
* This method is called by SeaTunnel Web to get the full option rule of a source.
* @return
*/
public static OptionRule sourceFullOptionRule(@NonNull Factory factory) {
public static OptionRule sourceFullOptionRule(@NonNull TableSourceFactory factory) {
OptionRule sourceOptionRule = factory.optionRule();
if (sourceOptionRule == null) {
throw new FactoryException("sourceOptionRule can not be null");
}

OptionRule sourceCommonOptionRule =
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
Class<? extends SeaTunnelSource> sourceClass = factory.getSourceClass();
if (sourceClass.isAssignableFrom(SupportParallelism.class)) {
OptionRule sourceCommonOptionRule =
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
}

return sourceOptionRule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.connector.TableSource;

Expand All @@ -36,4 +37,6 @@ default <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource
TableFactoryContext context) {
throw new UnsupportedOperationException("unsupported now");
}

Class<? extends SeaTunnelSource> getSourceClass();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand All @@ -42,4 +43,9 @@ public OptionRule optionRule() {
return OptionRule.builder()
.required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE, SeaTunnelSchema.SCHEMA).build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return AmazonDynamoDBSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
Expand Down Expand Up @@ -51,4 +52,9 @@ public OptionRule optionRule() {
JdbcSourceOptions.CONNECTION_POOL_SIZE)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return MySqlIncrementalSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;

Expand All @@ -40,4 +41,9 @@ public String factoryIdentifier() {
public OptionRule optionRule() {
return OptionRule.builder().required(HOST, DATABASE, SQL, USERNAME, PASSWORD).build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return ClickhouseSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SOURCE;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand All @@ -44,4 +45,9 @@ public OptionRule optionRule() {
return OptionRule.builder().required(HOSTS, INDEX).optional(USERNAME, PASSWORD, SCROLL_TIME, SCROLL_SIZE)
.exclusive(SOURCE, SeaTunnelSchema.SCHEMA).build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return ElasticsearchSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_LENGTH;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand All @@ -44,4 +45,9 @@ public OptionRule optionRule() {
return OptionRule.builder().required(SeaTunnelSchema.SCHEMA).optional(ROW_NUM, SPLIT_NUM, SPLIT_READ_INTERVAL, MAP_SIZE,
ARRAY_SIZE, BYTES_LENGTH, STRING_LENGTH).build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return FakeSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.ftp.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return FtpFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -52,4 +53,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return HdfsFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -51,4 +52,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return LocalFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.oss.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return OssFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.oss.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return OssFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.s3.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -58,4 +59,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return S3FileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.sftp.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.TIME_FORMAT)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return SftpFileSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.google.sheets.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand All @@ -42,4 +43,9 @@ public OptionRule optionRule() {
.optional(SeaTunnelSchema.SCHEMA)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return SheetsSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.hive.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
Expand All @@ -38,4 +39,9 @@ public OptionRule optionRule() {
.required(HiveConfig.METASTORE_URI)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return HiveSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.http.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
Expand Down Expand Up @@ -55,4 +56,9 @@ public OptionRule.Builder getHttpBuilder() {
.optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
.optional(HttpConfig.RETRY_BACKOFF_MAX_MS);
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return HttpSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public OptionRule optionRule() {
.conditional(HudiSourceConfig.USE_KERBEROS, true, HudiSourceConfig.KERBEROS_PRINCIPAL, HudiSourceConfig.KERBEROS_PRINCIPAL_FILE)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return HudiSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_USE_SNAPSHOT_TIMESTAMP;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;

Expand Down Expand Up @@ -70,4 +71,9 @@ public OptionRule optionRule() {
)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return IcebergSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class InfluxDBSinkFactory implements TableSourceFactory {
public class InfluxDBSinkFactory implements TableSinkFactory {

@Override
public String factoryIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.UPPER_BOUND;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;

Expand Down Expand Up @@ -60,4 +61,9 @@ public OptionRule optionRule() {
)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return InfluxDBSource.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.VERSION;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;

Expand All @@ -53,4 +54,9 @@ public OptionRule optionRule() {
ENABLE_CACHE_LEADER, VERSION, LOWER_BOUND, UPPER_BOUND, NUM_PARTITIONS)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return IoTDBSource.class;
}
}
Loading

0 comments on commit 417178f

Please sign in to comment.