Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector][PulsarSource]Improve pulsar deserialization #3990

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/en/connector-v2/source/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Source connector for Apache Pulsar.
| cursor.stop.mode | Enum | No | NEVER |
| cursor.stop.timestamp | Long | No | - |
| schema | config | No | - |
| format | String | no | json | |
| field_delimiter | String | no | , | |
| common-options | | no | - |

### topic [String]
Expand Down Expand Up @@ -126,6 +128,15 @@ Stop from the specified epoch timestamp (in milliseconds).

### schema [Config]

## format

Data format. The default format is json. Optional text format. The default field separator is ", ".
If you customize the delimiter, add the "field_delimiter" option.

## field_delimiter

Customize the field delimiter for data format.

#### fields [Config]

the schema fields of upstream data.
Expand All @@ -152,3 +163,4 @@ source {

### 2.3.0-beta 2022-10-20
- Add Pulsar Source Connector
- Improve pulsar deserialization([3990](https://github.com/apache/incubator-seatunnel/pull/3990))
Hisoka-X marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions seatunnel-connectors-v2/connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-text</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ public class SourceProperties {
private static final Integer DEFAULT_POLL_TIMEOUT = 100;
private static final Long DEFAULT_POLL_INTERVAL = 50L;
private static final Integer DEFAULT_POLL_BATCH_SIZE = 500;
/**
* The default data format is JSON
*/
public static final String DEFAULT_FORMAT = "json";

public static final String TEXT_FORMAT = "text";

/**
* The default field delimiter is “,”
*/
public static final String DEFAULT_FIELD_DELIMITER = ",";

// --------------------------------------------------------------------------------------------
// The configuration for ClientConfigurationData part.
Expand Down Expand Up @@ -167,7 +178,16 @@ public class SourceProperties {
.longType()
.noDefaultValue()
.withDescription("Stop from the specified epoch timestamp (in milliseconds)");
public static final Option<String> FORMAT = Options.key("format")
.stringType()
.noDefaultValue()
.withDescription("Data format. The default format is json. Optional text format. The default field separator is \", \". " +
"If you customize the delimiter, add the \"field_delimiter\" option.");

public static final Option<String> FIELD_DELIMITER = Options.key("field_delimiter")
.stringType()
.noDefaultValue()
.withDescription("Customize the field delimiter for data format.");
/**
* Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.DEFAULT_FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.DEFAULT_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TEXT_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
Expand All @@ -50,6 +55,7 @@
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
Expand All @@ -69,6 +75,8 @@
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand Down Expand Up @@ -226,10 +234,37 @@ private void setPartitionDiscoverer(Config config) {
}

private void setDeserialization(Config config) {
String format = config.getString("format");
// TODO: format SPI
SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA.key())).getSeaTunnelRowType();
deserialization = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
String schemaKey = SeaTunnelSchema.SCHEMA.key();
if (config.hasPath(schemaKey)) {
Config schema = config.getConfig(schemaKey);
SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
String format = DEFAULT_FORMAT;
if (config.hasPath(FORMAT.key())) {
format = config.getString(FORMAT.key());
}
if (DEFAULT_FORMAT.equals(format)) {
deserialization = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
} else if (TEXT_FORMAT.equals(format)) {
String delimiter = DEFAULT_FIELD_DELIMITER;
if (config.hasPath(FIELD_DELIMITER.key())) {
delimiter = config.getString(FIELD_DELIMITER.key());
}
deserialization = (DeserializationSchema<T>) TextDeserializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter(delimiter)
.build();
} else {
// TODO: use format SPI
throw new SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_OPERATION,
"Unsupported format: " + format);
}
} else {
SeaTunnelRowType rowType = SeaTunnelSchema.buildSimpleTextSchema();
deserialization = (DeserializationSchema<T>) TextDeserializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter(String.valueOf('\002'))
.build();
}
}

@Override
Expand Down