Skip to content

Commit

Permalink
Fix compilation errors
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 20, 2023
1 parent 5a5c4bb commit 03d0e34
Showing 1 changed file with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand All @@ -39,7 +40,6 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
Expand Down Expand Up @@ -252,8 +252,7 @@ public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> restoreEn

private void setDeserialization(Config config) {
if (config.hasPath(ConsumerConfig.SCHEMA.key())) {
Config schema = config.getConfig(ConsumerConfig.SCHEMA.key());
typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
SchemaFormat format = SchemaFormat.JSON;
if (config.hasPath(FORMAT.key())) {
format = SchemaFormat.find(config.getString(FORMAT.key()));
Expand All @@ -278,7 +277,7 @@ private void setDeserialization(Config config) {
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
}
} else {
typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
typeInfo = CatalogTableUtil.buildSimpleTextSchema();
this.deserializationSchema =
TextDeserializationSchema.builder()
.seaTunnelRowType(typeInfo)
Expand Down

0 comments on commit 03d0e34

Please sign in to comment.