From 6f49ec6ead82f842c6d93563ffce64853deb1bed Mon Sep 17 00:00:00 2001 From: lizhenglei <127465317+jackyyyyyssss@users.noreply.github.com> Date: Wed, 22 Nov 2023 10:05:58 +0800 Subject: [PATCH] [Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on http (#5816) --------- Co-authored-by: 80597928 <673421862@qq.com> --- .../seatunnel/http/config/HttpConfig.java | 2 + .../seatunnel/http/sink/HttpSink.java | 18 +---- .../seatunnel/http/sink/HttpSinkFactory.java | 10 +++ .../seatunnel/http/source/HttpSource.java | 78 +++++++++++++------ .../http/source/HttpSourceFactory.java | 16 +++- .../seatunnel/feishu/sink/FeishuSink.java | 11 ++- .../feishu/sink/FeishuSinkFactory.java | 11 +++ .../seatunnel/github/source/GithubSource.java | 19 ++--- .../github/source/GithubSourceFactory.java | 14 ++++ .../seatunnel/gitlab/source/GitlabSource.java | 37 ++++----- .../gitlab/source/GitlabSourceFactory.java | 14 ++++ .../seatunnel/jira/source/JiraSource.java | 36 ++++----- .../jira/source/JiraSourceFactory.java | 14 ++++ .../klaviyo/source/KlaviyoSource.java | 19 ++--- .../klaviyo/source/KlaviyoSourceFactory.java | 14 ++++ .../lemlist/source/LemlistSource.java | 19 ++--- .../lemlist/source/LemlistSourceFactory.java | 14 ++++ .../myhours/source/MyHoursSource.java | 19 ++--- .../myhours/source/MyHoursSourceFactory.java | 14 ++++ .../seatunnel/notion/source/NotionSource.java | 19 ++--- .../notion/source/NotionSourceFactory.java | 14 ++++ .../onesignal/source/OneSignalSource.java | 19 ++--- .../source/OneSignalSourceFactory.java | 14 ++++ .../persistiq/source/PersistiqSource.java | 18 ++--- .../source/PersistiqSourceFactory.java | 14 ++++ .../seatunnel/wechat/sink/WeChatSink.java | 11 ++- .../connector-http-e2e/pom.xml | 6 ++ .../seatunnel/e2e/connector/http/HttpIT.java | 5 ++ .../http_jsonrequestbody_to_feishu.conf | 43 ++++++++++ .../src/test/resources/mockserver-config.json | 21 +++++ 30 files changed, 391 insertions(+), 172 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonrequestbody_to_feishu.conf diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java index 2a68249b86d..de21b73c7c1 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java @@ -24,6 +24,8 @@ public class HttpConfig { public static final String BASIC = "Basic"; + public static final String CONNECTOR_IDENTITY = "Http"; + public static final int DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS = 100; public static final int DEFAULT_RETRY_BACKOFF_MAX_MS = 10000; public static final boolean DEFAULT_ENABLE_MULTI_LINES = false; diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java index d8a1c5fafe1..140e24a4f0e 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java @@ -19,9 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -34,25 +32,16 @@ import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; -import com.google.auto.service.AutoService; - import java.io.IOException; import java.util.Map; import java.util.stream.Collectors; -@AutoService(SeaTunnelSink.class) public class HttpSink extends AbstractSimpleSink { protected final HttpParameter httpParameter = new HttpParameter(); protected SeaTunnelRowType seaTunnelRowType; protected Config pluginConfig; - @Override - public String getPluginName() { - return "Http"; - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + public HttpSink(Config pluginConfig, SeaTunnelRowType rowType) { this.pluginConfig = pluginConfig; CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HttpConfig.URL.key()); if (!result.isSuccess()) { @@ -81,11 +70,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException { entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2))); } + this.seaTunnelRowType = rowType; } @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + public String getPluginName() { + return "Http"; } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java index 8411001fff8..539563ecb62 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java @@ -18,8 +18,11 @@ package org.apache.seatunnel.connectors.seatunnel.http.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; import com.google.auto.service.AutoService; @@ -31,6 +34,13 @@ public String factoryIdentifier() { return "Http"; } + @Override + public TableSink createSink(TableSinkFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> + new HttpSink(context.getOptions().toConfig(), catalogTable.getSeaTunnelRowType()); + } + @Override public OptionRule optionRule() { return OptionRule.builder() diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java index 747b1ed4ab8..314deded060 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java @@ -21,13 +21,16 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions; import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Boundedness; -import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; +import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -47,34 +50,23 @@ import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; import org.apache.seatunnel.format.json.JsonDeserializationSchema; -import com.google.auto.service.AutoService; +import com.google.common.collect.Lists; +import java.util.Collections; +import java.util.List; import java.util.Locale; -@AutoService(SeaTunnelSource.class) public class HttpSource extends AbstractSingleSplitSource { protected final HttpParameter httpParameter = new HttpParameter(); protected PageInfo pageInfo; - protected SeaTunnelRowType rowType; protected JsonField jsonField; protected String contentField; protected JobContext jobContext; protected DeserializationSchema deserializationSchema; - @Override - public String getPluginName() { - return "Http"; - } + protected CatalogTable catalogTable; - @Override - public Boundedness getBoundedness() { - return JobMode.BATCH.equals(jobContext.getJobMode()) - ? Boundedness.BOUNDED - : Boundedness.UNBOUNDED; - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + public HttpSource(Config pluginConfig) { CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HttpConfig.URL.key()); if (!result.isSuccess()) { throw new HttpConnectorException( @@ -88,6 +80,18 @@ public void prepare(Config pluginConfig) throws PrepareFailException { buildPagingWithConfig(pluginConfig); } + @Override + public String getPluginName() { + return HttpConfig.CONNECTOR_IDENTITY; + } + + @Override + public Boundedness getBoundedness() { + return JobMode.BATCH.equals(jobContext.getJobMode()) + ? Boundedness.BOUNDED + : Boundedness.UNBOUNDED; + } + private void buildPagingWithConfig(Config pluginConfig) { if (pluginConfig.hasPath(HttpConfig.PAGEING.key())) { pageInfo = new PageInfo(); @@ -114,7 +118,7 @@ private void buildPagingWithConfig(Config pluginConfig) { protected void buildSchemaWithConfig(Config pluginConfig) { if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) { - this.rowType = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); + this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig); // default use json format HttpConfig.ResponseFormat format = HttpConfig.FORMAT.defaultValue(); if (pluginConfig.hasPath(HttpConfig.FORMAT.key())) { @@ -127,7 +131,8 @@ protected void buildSchemaWithConfig(Config pluginConfig) { switch (format) { case JSON: this.deserializationSchema = - new JsonDeserializationSchema(false, false, rowType); + new JsonDeserializationSchema( + false, false, catalogTable.getSeaTunnelRowType()); if (pluginConfig.hasPath(HttpConfig.JSON_FIELD.key())) { jsonField = getJsonField(pluginConfig.getConfig(HttpConfig.JSON_FIELD.key())); @@ -145,8 +150,33 @@ protected void buildSchemaWithConfig(Config pluginConfig) { format)); } } else { - this.rowType = CatalogTableUtil.buildSimpleTextSchema(); - this.deserializationSchema = new SimpleTextDeserializationSchema(this.rowType); + TableIdentifier tableIdentifier = + TableIdentifier.of(HttpConfig.CONNECTOR_IDENTITY, null, null); + TableSchema tableSchema = + TableSchema.builder() + .column( + PhysicalColumn.of( + "content", + new SeaTunnelRowType( + new String[] {"content"}, + new SeaTunnelDataType[] { + BasicType.STRING_TYPE + }), + 0, + false, + null, + null)) + .build(); + + this.catalogTable = + CatalogTable.of( + tableIdentifier, + tableSchema, + Collections.emptyMap(), + Collections.emptyList(), + null); + this.deserializationSchema = + new SimpleTextDeserializationSchema(catalogTable.getSeaTunnelRowType()); } } @@ -156,8 +186,8 @@ public void setJobContext(JobContext jobContext) { } @Override - public SeaTunnelDataType getProducedType() { - return this.rowType; + public List getProducedCatalogTables() { + return Lists.newArrayList(catalogTable); } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java index 21bc3940e1f..c0a276d7237 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java @@ -19,14 +19,18 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; -import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class HttpSourceFactory implements TableSourceFactory { @@ -40,6 +44,14 @@ public OptionRule optionRule() { return getHttpBuilder().build(); } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new HttpSource(context.getOptions().toConfig()); + } + public OptionRule.Builder getHttpBuilder() { return OptionRule.builder() .required(HttpConfig.URL) @@ -47,9 +59,9 @@ public OptionRule.Builder getHttpBuilder() { .optional(HttpConfig.HEADERS) .optional(HttpConfig.PARAMS) .optional(HttpConfig.FORMAT) + .optional(HttpConfig.BODY) .optional(HttpConfig.JSON_FIELD) .optional(HttpConfig.CONTENT_FIELD) - .conditional(HttpConfig.METHOD, HttpRequestMethod.POST, HttpConfig.BODY) .conditional( HttpConfig.FORMAT, HttpConfig.ResponseFormat.JSON, diff --git a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java index 5656ad11b2b..b3fbaa6a5ba 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java @@ -17,13 +17,16 @@ package org.apache.seatunnel.connectors.seatunnel.feishu.sink; -import org.apache.seatunnel.api.sink.SeaTunnelSink; -import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink; +import org.apache.seatunnel.shade.com.typesafe.config.Config; -import com.google.auto.service.AutoService; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink; -@AutoService(SeaTunnelSink.class) public class FeishuSink extends HttpSink { + public FeishuSink(Config pluginConfig, SeaTunnelRowType rowType) { + super(pluginConfig, rowType); + } + @Override public String getPluginName() { return "Feishu"; diff --git a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java index 2ee37f048bc..f9cd6ee01ca 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java @@ -17,13 +17,24 @@ package org.apache.seatunnel.connectors.seatunnel.feishu.sink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkFactory; import com.google.auto.service.AutoService; @AutoService(Factory.class) public class FeishuSinkFactory extends HttpSinkFactory { + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> + new FeishuSink(context.getOptions().toConfig(), catalogTable.getSeaTunnelRowType()); + } + @Override public String factoryIdentifier() { return "Feishu"; diff --git a/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSource.java b/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSource.java index e5bdec5744a..0c8315502a8 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSource.java @@ -19,9 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -34,24 +32,17 @@ import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader; -import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; @Slf4j -@AutoService(SeaTunnelSource.class) public class GithubSource extends HttpSource { public static final String PLUGIN_NAME = "Github"; private final GithubSourceParameter githubSourceParam = new GithubSourceParameter(); - @Override - public String getPluginName() { - return PLUGIN_NAME; - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + public GithubSource(Config pluginConfig) { + super(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, GithubSourceConfig.URL.key()); if (!result.isSuccess()) { @@ -62,7 +53,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { getPluginName(), PluginType.SOURCE, result.getMsg())); } githubSourceParam.buildWithConfig(pluginConfig); - buildSchemaWithConfig(pluginConfig); + } + + @Override + public String getPluginName() { + return PLUGIN_NAME; } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSourceFactory.java index fa1b9f9de88..ca5ad451088 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSourceFactory.java @@ -18,12 +18,18 @@ package org.apache.seatunnel.connectors.seatunnel.github.source; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.github.config.GithubSourceConfig; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class GithubSourceFactory extends HttpSourceFactory { @@ -32,6 +38,14 @@ public String factoryIdentifier() { return GithubSource.PLUGIN_NAME; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new GithubSource(context.getOptions().toConfig()); + } + @Override public OptionRule optionRule() { return getHttpBuilder().required(GithubSourceConfig.ACCESS_TOKEN).build(); diff --git a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java index 524bb066003..73c687b8593 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java @@ -19,10 +19,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.Boundedness; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -36,30 +34,14 @@ import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader; -import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; @Slf4j -@AutoService(SeaTunnelSource.class) public class GitlabSource extends HttpSource { private final GitlabSourceParameter gitlabSourceParameter = new GitlabSourceParameter(); - @Override - public String getPluginName() { - return "Gitlab"; - } - - @Override - public Boundedness getBoundedness() { - if (JobMode.BATCH.equals(jobContext.getJobMode())) { - return Boundedness.BOUNDED; - } - throw new UnsupportedOperationException( - "Gitlab source connector not support unbounded operation"); - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + public GitlabSource(Config pluginConfig) { + super(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists( pluginConfig, @@ -73,7 +55,20 @@ public void prepare(Config pluginConfig) throws PrepareFailException { getPluginName(), PluginType.SOURCE, result.getMsg())); } this.gitlabSourceParameter.buildWithConfig(pluginConfig); - buildSchemaWithConfig(pluginConfig); + } + + @Override + public String getPluginName() { + return "Gitlab"; + } + + @Override + public Boundedness getBoundedness() { + if (JobMode.BATCH.equals(jobContext.getJobMode())) { + return Boundedness.BOUNDED; + } + throw new UnsupportedOperationException( + "Gitlab source connector not support unbounded operation"); } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSourceFactory.java index c4a0f3e9acd..f4009349670 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSourceFactory.java @@ -18,12 +18,18 @@ package org.apache.seatunnel.connectors.seatunnel.gitlab.source; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.gitlab.source.config.GitlabSourceConfig; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class GitlabSourceFactory extends HttpSourceFactory { @Override @@ -31,6 +37,14 @@ public String factoryIdentifier() { return "Gitlab"; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new GitlabSource(context.getOptions().toConfig()); + } + @Override public OptionRule optionRule() { return getHttpBuilder().required(GitlabSourceConfig.ACCESS_TOKEN).build(); diff --git a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java index bd4a57cdd71..4055563a943 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.source.Boundedness; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -34,32 +33,16 @@ import org.apache.seatunnel.connectors.seatunnel.jira.source.config.JiraSourceConfig; import org.apache.seatunnel.connectors.seatunnel.jira.source.config.JiraSourceParameter; -import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; import static org.apache.seatunnel.connectors.seatunnel.http.util.AuthorizationUtil.getTokenByBasicAuth; @Slf4j -@AutoService(SeaTunnelSource.class) public class JiraSource extends HttpSource { private final JiraSourceParameter jiraSourceParameter = new JiraSourceParameter(); - @Override - public String getPluginName() { - return "Jira"; - } - - @Override - public Boundedness getBoundedness() { - if (JobMode.BATCH.equals(jobContext.getJobMode())) { - return Boundedness.BOUNDED; - } - throw new UnsupportedOperationException( - "Jira source connector not support unbounded operation"); - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + protected JiraSource(Config pluginConfig) { + super(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists( pluginConfig, @@ -75,7 +58,20 @@ public void prepare(Config pluginConfig) throws PrepareFailException { pluginConfig.getString(JiraSourceConfig.EMAIL.key()), pluginConfig.getString(JiraSourceConfig.API_TOKEN.key())); jiraSourceParameter.buildWithConfig(pluginConfig, accessToken); - buildSchemaWithConfig(pluginConfig); + } + + @Override + public String getPluginName() { + return "Jira"; + } + + @Override + public Boundedness getBoundedness() { + if (JobMode.BATCH.equals(jobContext.getJobMode())) { + return Boundedness.BOUNDED; + } + throw new UnsupportedOperationException( + "Jira source connector not support unbounded operation"); } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSourceFactory.java index 7c809ea421f..c06cf98ff10 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSourceFactory.java @@ -18,12 +18,18 @@ package org.apache.seatunnel.connectors.seatunnel.jira.source; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; import org.apache.seatunnel.connectors.seatunnel.jira.source.config.JiraSourceConfig; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class JiraSourceFactory extends HttpSourceFactory { @Override @@ -31,6 +37,14 @@ public String factoryIdentifier() { return "Jira"; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new JiraSource(context.getOptions().toConfig()); + } + @Override public OptionRule optionRule() { return getHttpBuilder() diff --git a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java index 158507bfcac..408cf5299a8 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java @@ -19,9 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -34,21 +32,14 @@ import org.apache.seatunnel.connectors.seatunnel.klaviyo.source.config.KlaviyoSourceParameter; import org.apache.seatunnel.connectors.seatunnel.klaviyo.source.config.exception.KlaviyoConnectorException; -import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; @Slf4j -@AutoService(SeaTunnelSource.class) public class KlaviyoSource extends HttpSource { private final KlaviyoSourceParameter klaviyoSourceParameter = new KlaviyoSourceParameter(); - @Override - public String getPluginName() { - return "Klaviyo"; - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + public KlaviyoSource(Config pluginConfig) { + super(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists( pluginConfig, @@ -63,7 +54,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { getPluginName(), PluginType.SOURCE, result.getMsg())); } this.klaviyoSourceParameter.buildWithConfig(pluginConfig); - buildSchemaWithConfig(pluginConfig); + } + + @Override + public String getPluginName() { + return "Klaviyo"; } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSourceFactory.java index 4b2817122d3..44517f3524a 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSourceFactory.java @@ -18,12 +18,18 @@ package org.apache.seatunnel.connectors.seatunnel.klaviyo.source; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; import org.apache.seatunnel.connectors.seatunnel.klaviyo.source.config.KlaviyoSourceConfig; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class KlaviyoSourceFactory extends HttpSourceFactory { @Override @@ -31,6 +37,14 @@ public String factoryIdentifier() { return "Klaviyo"; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new KlaviyoSource(context.getOptions().toConfig()); + } + @Override public OptionRule optionRule() { return getHttpBuilder() diff --git a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java index 35a6404678c..123a758635e 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java @@ -19,9 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -34,23 +32,16 @@ import org.apache.seatunnel.connectors.seatunnel.lemlist.source.config.LemlistSourceParameter; import org.apache.seatunnel.connectors.seatunnel.lemlist.source.exception.LemlistConnectorException; -import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; import static org.apache.seatunnel.connectors.seatunnel.http.util.AuthorizationUtil.getTokenByBasicAuth; @Slf4j -@AutoService(SeaTunnelSource.class) public class LemlistSource extends HttpSource { private final LemlistSourceParameter lemlistSourceParameter = new LemlistSourceParameter(); - @Override - public String getPluginName() { - return "Lemlist"; - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + public LemlistSource(Config pluginConfig) { + super(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists( pluginConfig, @@ -67,7 +58,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { String accessToken = getTokenByBasicAuth("", pluginConfig.getString(LemlistSourceConfig.PASSWORD.key())); lemlistSourceParameter.buildWithConfig(pluginConfig, accessToken); - buildSchemaWithConfig(pluginConfig); + } + + @Override + public String getPluginName() { + return "Lemlist"; } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSourceFactory.java index d6ebff02f2a..8581961b38d 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSourceFactory.java @@ -18,12 +18,18 @@ package org.apache.seatunnel.connectors.seatunnel.lemlist.source; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; import org.apache.seatunnel.connectors.seatunnel.lemlist.source.config.LemlistSourceConfig; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class LemlistSourceFactory extends HttpSourceFactory { @Override @@ -31,6 +37,14 @@ public String factoryIdentifier() { return "Lemlist"; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new LemlistSource(context.getOptions().toConfig()); + } + @Override public OptionRule optionRule() { return getHttpBuilder().required(LemlistSourceConfig.PASSWORD).build(); diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java index 2ac6ad2b15e..31f606afdd3 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java @@ -19,9 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -38,7 +36,6 @@ import org.apache.seatunnel.connectors.seatunnel.myhours.source.exception.MyHoursConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.myhours.source.exception.MyHoursConnectorException; -import com.google.auto.service.AutoService; import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; @@ -46,17 +43,11 @@ import java.util.Map; @Slf4j -@AutoService(SeaTunnelSource.class) public class MyHoursSource extends HttpSource { private final MyHoursSourceParameter myHoursSourceParameter = new MyHoursSourceParameter(); - @Override - public String getPluginName() { - return "MyHours"; - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + protected MyHoursSource(Config pluginConfig) { + super(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists( pluginConfig, @@ -73,7 +64,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { // Login to get accessToken String accessToken = getAccessToken(pluginConfig); this.myHoursSourceParameter.buildWithConfig(pluginConfig, accessToken); - buildSchemaWithConfig(pluginConfig); + } + + @Override + public String getPluginName() { + return "MyHours"; } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java index 24daeba1be8..7e080e19b77 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java @@ -18,12 +18,18 @@ package org.apache.seatunnel.connectors.seatunnel.myhours.source; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; import org.apache.seatunnel.connectors.seatunnel.myhours.source.config.MyHoursSourceConfig; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class MyHoursSourceFactory extends HttpSourceFactory { @Override @@ -31,6 +37,14 @@ public String factoryIdentifier() { return "MyHours"; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new MyHoursSource(context.getOptions().toConfig()); + } + @Override public OptionRule optionRule() { return getHttpBuilder() diff --git a/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSource.java b/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSource.java index b21c94324fc..eaddf987b86 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSource.java @@ -19,9 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -34,21 +32,14 @@ import org.apache.seatunnel.connectors.seatunnel.notion.source.config.NotionSourceParameter; import org.apache.seatunnel.connectors.seatunnel.notion.source.exception.NotionConnectorException; -import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; @Slf4j -@AutoService(SeaTunnelSource.class) public class NotionSource extends HttpSource { private final NotionSourceParameter notionSourceParameter = new NotionSourceParameter(); - @Override - public String getPluginName() { - return "Notion"; - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + protected NotionSource(Config pluginConfig) { + super(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists( pluginConfig, @@ -63,7 +54,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { getPluginName(), PluginType.SOURCE, result.getMsg())); } notionSourceParameter.buildWithConfig(pluginConfig); - buildSchemaWithConfig(pluginConfig); + } + + @Override + public String getPluginName() { + return "Notion"; } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSourceFactory.java index 0d7503ec777..3811799deb0 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSourceFactory.java @@ -18,12 +18,18 @@ package org.apache.seatunnel.connectors.seatunnel.notion.source; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; import org.apache.seatunnel.connectors.seatunnel.notion.source.config.NotionSourceConfig; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class NotionSourceFactory extends HttpSourceFactory { @Override @@ -31,6 +37,14 @@ public String factoryIdentifier() { return "Notion"; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new NotionSource(context.getOptions().toConfig()); + } + @Override public OptionRule optionRule() { return getHttpBuilder() diff --git a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java index 2e0a4e2e534..0ec750bb970 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java @@ -19,9 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -34,22 +32,15 @@ import org.apache.seatunnel.connectors.seatunnel.onesignal.source.config.OneSignalSourceParameter; import org.apache.seatunnel.connectors.seatunnel.onesignal.source.config.exception.OneSignalConnectorException; -import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; @Slf4j -@AutoService(SeaTunnelSource.class) public class OneSignalSource extends HttpSource { private final OneSignalSourceParameter oneSignalSourceParameter = new OneSignalSourceParameter(); - @Override - public String getPluginName() { - return "OneSignal"; - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + protected OneSignalSource(Config pluginConfig) { + super(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists( pluginConfig, @@ -63,7 +54,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { getPluginName(), PluginType.SOURCE, result.getMsg())); } oneSignalSourceParameter.buildWithConfig(pluginConfig); - buildSchemaWithConfig(pluginConfig); + } + + @Override + public String getPluginName() { + return "OneSignal"; } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSourceFactory.java index 283f6c1c343..c37e44ddafd 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSourceFactory.java @@ -18,12 +18,18 @@ package org.apache.seatunnel.connectors.seatunnel.onesignal.source; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; import org.apache.seatunnel.connectors.seatunnel.onesignal.source.config.OneSignalSourceConfig; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class OneSignalSourceFactory extends HttpSourceFactory { @Override @@ -31,6 +37,14 @@ public String factoryIdentifier() { return "OneSignal"; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new OneSignalSource(context.getOptions().toConfig()); + } + @Override public OptionRule optionRule() { return getHttpBuilder().required(OneSignalSourceConfig.PASSWORD).build(); diff --git a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java index 11fd142898e..1af30d1e412 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -32,22 +31,15 @@ import org.apache.seatunnel.connectors.seatunnel.persistiq.source.config.PersistiqSourceConfig; import org.apache.seatunnel.connectors.seatunnel.persistiq.source.config.PersistiqSourceParameter; -import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; @Slf4j -@AutoService(SeaTunnelSource.class) public class PersistiqSource extends HttpSource { private final PersistiqSourceParameter persistiqSourceParameter = new PersistiqSourceParameter(); - @Override - public String getPluginName() { - return "Persistiq"; - } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { + public PersistiqSource(Config pluginConfig) { + super(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists( pluginConfig, @@ -57,7 +49,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } persistiqSourceParameter.buildWithConfig(pluginConfig); - buildSchemaWithConfig(pluginConfig); + } + + @Override + public String getPluginName() { + return "Persistiq"; } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java index 4fd1e0522a8..fa19c99789a 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java @@ -18,12 +18,18 @@ package org.apache.seatunnel.connectors.seatunnel.persistiq.source; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory; import org.apache.seatunnel.connectors.seatunnel.persistiq.source.config.PersistiqSourceConfig; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class PersistiqSourceFactory extends HttpSourceFactory { @Override @@ -31,6 +37,14 @@ public String factoryIdentifier() { return "Persistiq"; } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new PersistiqSource(context.getOptions().toConfig()); + } + @Override public OptionRule optionRule() { return getHttpBuilder().required(PersistiqSourceConfig.PASSWORD).build(); diff --git a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java index 7e0cb99548b..ca6459bee15 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java @@ -17,19 +17,22 @@ package org.apache.seatunnel.connectors.seatunnel.wechat.sink; -import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink; import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter; import org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig; -import com.google.auto.service.AutoService; - -@AutoService(SeaTunnelSink.class) public class WeChatSink extends HttpSink { + public WeChatSink(Config pluginConfig, SeaTunnelRowType rowType) { + super(pluginConfig, rowType); + } + @Override public String getPluginName() { return "WeChat"; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml index 05428e78d21..84b73a5998b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml @@ -92,6 +92,12 @@ ${project.version} test + + org.apache.seatunnel + connector-http-feishu + ${project.version} + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java index bd85ed876e5..6266905a7bf 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java @@ -149,9 +149,14 @@ public void testSourceToAssertSink(TestContainer container) Container.ExecResult execResult15 = container.executeJob("/http_page_increase_page_num.conf"); Assertions.assertEquals(0, execResult15.getExitCode()); + Container.ExecResult execResult16 = container.executeJob("/http_page_increase_no_page_num.conf"); Assertions.assertEquals(0, execResult16.getExitCode()); + + Container.ExecResult execResult17 = + container.executeJob("/http_jsonrequestbody_to_feishu.conf"); + Assertions.assertEquals(0, execResult17.getExitCode()); } public String getMockServerConfig() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonrequestbody_to_feishu.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonrequestbody_to_feishu.conf new file mode 100644 index 00000000000..5027863ce6a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonrequestbody_to_feishu.conf @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Http { + result_table_name = "http" + url = "http://mockserver:1080/example/jsonBody" + method = "POST" + body="{"id":1}" + format = "json" + schema = { + fields { + name = string + age = int + } + } + } +} + +sink { + Feishu { + url = "http://mockserver:1080/example/feishu/108bb8f208d9b2378c8c7aedad715c19" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json index 9cb561225d6..d41975d1339 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json @@ -4670,5 +4670,26 @@ "hasNext": false } } + }, + { + "httpRequest": { + "path": "/example/feishu/108bb8f208d9b2378c8c7aedad715c19", + "method": "POST" + }, + "httpResponse": { + "body": [ + { + "name": "lzl", + "age": 18 + }, + { + "name": "pizz", + "age": 19 + } + ], + "headers": { + "Content-Type": "application/json" + } + } } ]