Skip to content

Commit

Permalink
[Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory o…
Browse files Browse the repository at this point in the history
…n http (#5816)



---------

Co-authored-by: 80597928 <673421862@qq.com>
  • Loading branch information
jackyyyyyssss and q3356564 authored Nov 22, 2023
1 parent 8d9a0b7 commit 6f49ec6
Show file tree
Hide file tree
Showing 30 changed files with 391 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

public class HttpConfig {
public static final String BASIC = "Basic";
public static final String CONNECTOR_IDENTITY = "Http";

public static final int DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS = 100;
public static final int DEFAULT_RETRY_BACKOFF_MAX_MS = 10000;
public static final boolean DEFAULT_ENABLE_MULTI_LINES = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand All @@ -34,25 +32,16 @@
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;

@AutoService(SeaTunnelSink.class)
public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
protected final HttpParameter httpParameter = new HttpParameter();
protected SeaTunnelRowType seaTunnelRowType;
protected Config pluginConfig;

@Override
public String getPluginName() {
return "Http";
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
public HttpSink(Config pluginConfig, SeaTunnelRowType rowType) {
this.pluginConfig = pluginConfig;
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HttpConfig.URL.key());
if (!result.isSuccess()) {
Expand Down Expand Up @@ -81,11 +70,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
entry -> String.valueOf(entry.getValue().unwrapped()),
(v1, v2) -> v2)));
}
this.seaTunnelRowType = rowType;
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
public String getPluginName() {
return "Http";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.seatunnel.connectors.seatunnel.http.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;

import com.google.auto.service.AutoService;
Expand All @@ -31,6 +34,13 @@ public String factoryIdentifier() {
return "Http";
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
CatalogTable catalogTable = context.getCatalogTable();
return () ->
new HttpSink(context.getOptions().toConfig(), catalogTable.getSeaTunnelRowType());
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand All @@ -47,34 +50,23 @@
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

import com.google.auto.service.AutoService;
import com.google.common.collect.Lists;

import java.util.Collections;
import java.util.List;
import java.util.Locale;

@AutoService(SeaTunnelSource.class)
public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
protected final HttpParameter httpParameter = new HttpParameter();
protected PageInfo pageInfo;
protected SeaTunnelRowType rowType;
protected JsonField jsonField;
protected String contentField;
protected JobContext jobContext;
protected DeserializationSchema<SeaTunnelRow> deserializationSchema;

@Override
public String getPluginName() {
return "Http";
}
protected CatalogTable catalogTable;

@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(jobContext.getJobMode())
? Boundedness.BOUNDED
: Boundedness.UNBOUNDED;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
public HttpSource(Config pluginConfig) {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HttpConfig.URL.key());
if (!result.isSuccess()) {
throw new HttpConnectorException(
Expand All @@ -88,6 +80,18 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
buildPagingWithConfig(pluginConfig);
}

@Override
public String getPluginName() {
return HttpConfig.CONNECTOR_IDENTITY;
}

@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(jobContext.getJobMode())
? Boundedness.BOUNDED
: Boundedness.UNBOUNDED;
}

private void buildPagingWithConfig(Config pluginConfig) {
if (pluginConfig.hasPath(HttpConfig.PAGEING.key())) {
pageInfo = new PageInfo();
Expand All @@ -114,7 +118,7 @@ private void buildPagingWithConfig(Config pluginConfig) {

protected void buildSchemaWithConfig(Config pluginConfig) {
if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
this.rowType = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
// default use json format
HttpConfig.ResponseFormat format = HttpConfig.FORMAT.defaultValue();
if (pluginConfig.hasPath(HttpConfig.FORMAT.key())) {
Expand All @@ -127,7 +131,8 @@ protected void buildSchemaWithConfig(Config pluginConfig) {
switch (format) {
case JSON:
this.deserializationSchema =
new JsonDeserializationSchema(false, false, rowType);
new JsonDeserializationSchema(
false, false, catalogTable.getSeaTunnelRowType());
if (pluginConfig.hasPath(HttpConfig.JSON_FIELD.key())) {
jsonField =
getJsonField(pluginConfig.getConfig(HttpConfig.JSON_FIELD.key()));
Expand All @@ -145,8 +150,33 @@ protected void buildSchemaWithConfig(Config pluginConfig) {
format));
}
} else {
this.rowType = CatalogTableUtil.buildSimpleTextSchema();
this.deserializationSchema = new SimpleTextDeserializationSchema(this.rowType);
TableIdentifier tableIdentifier =
TableIdentifier.of(HttpConfig.CONNECTOR_IDENTITY, null, null);
TableSchema tableSchema =
TableSchema.builder()
.column(
PhysicalColumn.of(
"content",
new SeaTunnelRowType(
new String[] {"content"},
new SeaTunnelDataType<?>[] {
BasicType.STRING_TYPE
}),
0,
false,
null,
null))
.build();

this.catalogTable =
CatalogTable.of(
tableIdentifier,
tableSchema,
Collections.emptyMap(),
Collections.emptyList(),
null);
this.deserializationSchema =
new SimpleTextDeserializationSchema(catalogTable.getSeaTunnelRowType());
}
}

Expand All @@ -156,8 +186,8 @@ public void setJobContext(JobContext jobContext) {
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.rowType;
public List<CatalogTable> getProducedCatalogTables() {
return Lists.newArrayList(catalogTable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;

import com.google.auto.service.AutoService;

import java.io.Serializable;

@AutoService(Factory.class)
public class HttpSourceFactory implements TableSourceFactory {

Expand All @@ -40,16 +44,24 @@ public OptionRule optionRule() {
return getHttpBuilder().build();
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () ->
(SeaTunnelSource<T, SplitT, StateT>)
new HttpSource(context.getOptions().toConfig());
}

public OptionRule.Builder getHttpBuilder() {
return OptionRule.builder()
.required(HttpConfig.URL)
.optional(HttpConfig.METHOD)
.optional(HttpConfig.HEADERS)
.optional(HttpConfig.PARAMS)
.optional(HttpConfig.FORMAT)
.optional(HttpConfig.BODY)
.optional(HttpConfig.JSON_FIELD)
.optional(HttpConfig.CONTENT_FIELD)
.conditional(HttpConfig.METHOD, HttpRequestMethod.POST, HttpConfig.BODY)
.conditional(
HttpConfig.FORMAT,
HttpConfig.ResponseFormat.JSON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.seatunnel.connectors.seatunnel.feishu.sink;

import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;

@AutoService(SeaTunnelSink.class)
public class FeishuSink extends HttpSink {
public FeishuSink(Config pluginConfig, SeaTunnelRowType rowType) {
super(pluginConfig, rowType);
}

@Override
public String getPluginName() {
return "Feishu";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,24 @@

package org.apache.seatunnel.connectors.seatunnel.feishu.sink;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkFactory;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class FeishuSinkFactory extends HttpSinkFactory {

@Override
public TableSink createSink(TableSinkFactoryContext context) {
CatalogTable catalogTable = context.getCatalogTable();
return () ->
new FeishuSink(context.getOptions().toConfig(), catalogTable.getSeaTunnelRowType());
}

@Override
public String factoryIdentifier() {
return "Feishu";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
Expand All @@ -34,24 +32,17 @@
import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@AutoService(SeaTunnelSource.class)
public class GithubSource extends HttpSource {

public static final String PLUGIN_NAME = "Github";

private final GithubSourceParameter githubSourceParam = new GithubSourceParameter();

@Override
public String getPluginName() {
return PLUGIN_NAME;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
public GithubSource(Config pluginConfig) {
super(pluginConfig);
CheckResult result =
CheckConfigUtil.checkAllExists(pluginConfig, GithubSourceConfig.URL.key());
if (!result.isSuccess()) {
Expand All @@ -62,7 +53,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
githubSourceParam.buildWithConfig(pluginConfig);
buildSchemaWithConfig(pluginConfig);
}

@Override
public String getPluginName() {
return PLUGIN_NAME;
}

@Override
Expand Down
Loading

0 comments on commit 6f49ec6

Please sign in to comment.