From 2ca45d3dc6352c0e9ed096f52650f6ce5d1d6651 Mon Sep 17 00:00:00 2001 From: zhaoliang01 Date: Tue, 24 Jan 2023 06:49:03 +0800 Subject: [PATCH 01/12] Improve pulsar deserialization --- docs/en/connector-v2/source/pulsar.md | 11 +++++ .../connector-pulsar/pom.xml | 6 +++ .../pulsar/config/SourceProperties.java | 20 +++++++++ .../seatunnel/pulsar/source/PulsarSource.java | 43 +++++++++++++++++-- 4 files changed, 76 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/pulsar.md index bbe74b78ffd..fa8f2f2a099 100644 --- a/docs/en/connector-v2/source/pulsar.md +++ b/docs/en/connector-v2/source/pulsar.md @@ -36,6 +36,8 @@ Source connector for Apache Pulsar. | cursor.stop.mode | Enum | No | NEVER | | cursor.stop.timestamp | Long | No | - | | schema | config | No | - | +| format | String | no | json | | +| field_delimiter | String | no | , | | | common-options | | no | - | ### topic [String] @@ -126,6 +128,15 @@ Stop from the specified epoch timestamp (in milliseconds). ### schema [Config] +## format + +Data format. The default format is json. Optional text format. The default field separator is ", ". +If you customize the delimiter, add the "field_delimiter" option. + +## field_delimiter + +Customize the field delimiter for data format. + #### fields [Config] the schema fields of upstream data. diff --git a/seatunnel-connectors-v2/connector-pulsar/pom.xml b/seatunnel-connectors-v2/connector-pulsar/pom.xml index cb23ca7b452..df83c15909a 100644 --- a/seatunnel-connectors-v2/connector-pulsar/pom.xml +++ b/seatunnel-connectors-v2/connector-pulsar/pom.xml @@ -41,6 +41,12 @@ ${project.version} + + org.apache.seatunnel + seatunnel-format-text + ${project.version} + + org.apache.seatunnel connector-common diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java index 6f745041a18..b5378ba22dc 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java @@ -26,6 +26,17 @@ public class SourceProperties { private static final Integer DEFAULT_POLL_TIMEOUT = 100; private static final Long DEFAULT_POLL_INTERVAL = 50L; private static final Integer DEFAULT_POLL_BATCH_SIZE = 500; + /** + * The default data format is JSON + */ + public static final String DEFAULT_FORMAT = "json"; + + public static final String TEXT_FORMAT = "text"; + + /** + * The default field delimiter is “,” + */ + public static final String DEFAULT_FIELD_DELIMITER = ","; // -------------------------------------------------------------------------------------------- // The configuration for ClientConfigurationData part. @@ -167,7 +178,16 @@ public class SourceProperties { .longType() .noDefaultValue() .withDescription("Stop from the specified epoch timestamp (in milliseconds)"); + public static final Option FORMAT = Options.key("format") + .stringType() + .noDefaultValue() + .withDescription("Data format. The default format is json. Optional text format. The default field separator is \", \". " + + "If you customize the delimiter, add the \"field_delimiter\" option."); + public static final Option FIELD_DELIMITER = Options.key("field_delimiter") + .stringType() + .noDefaultValue() + .withDescription("Customize the field delimiter for data format."); /** * Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}. */ diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java index 674ecbf2cfa..fad1b2741a9 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java @@ -28,11 +28,16 @@ import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.DEFAULT_FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.DEFAULT_FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FORMAT; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TEXT_FORMAT; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN; @@ -50,6 +55,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig; import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig; @@ -69,6 +75,8 @@ import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader; import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit; import org.apache.seatunnel.format.json.JsonDeserializationSchema; +import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; +import org.apache.seatunnel.format.text.TextDeserializationSchema; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -226,10 +234,37 @@ private void setPartitionDiscoverer(Config config) { } private void setDeserialization(Config config) { - String format = config.getString("format"); - // TODO: format SPI - SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA.key())).getSeaTunnelRowType(); - deserialization = (DeserializationSchema) new JsonDeserializationSchema(false, false, rowType); + String schemaKey = SeaTunnelSchema.SCHEMA.key(); + if (config.hasPath(schemaKey)) { + Config schema = config.getConfig(schemaKey); + SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType(); + String format = DEFAULT_FORMAT; + if (config.hasPath(FORMAT.key())) { + format = config.getString(FORMAT.key()); + } + if (DEFAULT_FORMAT.equals(format)) { + deserialization = (DeserializationSchema) new JsonDeserializationSchema(false, false, rowType); + } else if (TEXT_FORMAT.equals(format)) { + String delimiter = DEFAULT_FIELD_DELIMITER; + if (config.hasPath(FIELD_DELIMITER.key())) { + delimiter = config.getString(FIELD_DELIMITER.key()); + } + deserialization = (DeserializationSchema) TextDeserializationSchema.builder() + .seaTunnelRowType(rowType) + .delimiter(delimiter) + .build(); + } else { + // TODO: use format SPI + throw new SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported format: " + format); + } + } else { + SeaTunnelRowType rowType = SeaTunnelSchema.buildSimpleTextSchema(); + deserialization = (DeserializationSchema) TextDeserializationSchema.builder() + .seaTunnelRowType(rowType) + .delimiter(String.valueOf('\002')) + .build(); + } } @Override From 5de7b23f5863397b61dfd8582b8bf282c58fba0c Mon Sep 17 00:00:00 2001 From: zhaoliang01 Date: Tue, 24 Jan 2023 18:33:04 +0800 Subject: [PATCH 02/12] Improve pulsar deserialization --- docs/en/connector-v2/source/pulsar.md | 1 + .../connectors/seatunnel/pulsar/source/PulsarSource.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/pulsar.md index fa8f2f2a099..7e67b62517c 100644 --- a/docs/en/connector-v2/source/pulsar.md +++ b/docs/en/connector-v2/source/pulsar.md @@ -163,3 +163,4 @@ source { ### 2.3.0-beta 2022-10-20 - Add Pulsar Source Connector +- Improve pulsar deserialization([3990](https://github.com/apache/incubator-seatunnel/pull/3990)) diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java index fad1b2741a9..2c6ba90d65c 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java @@ -255,7 +255,7 @@ private void setDeserialization(Config config) { .build(); } else { // TODO: use format SPI - throw new SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + throw new SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_OPERATION, "Unsupported format: " + format); } } else { From f6e0f2df86d727baa12dd4fe095b4974464cd662 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 24 Jan 2023 22:21:50 +0800 Subject: [PATCH 03/12] Update docs/en/connector-v2/source/pulsar.md --- docs/en/connector-v2/source/pulsar.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/pulsar.md index 7e67b62517c..80421951f2f 100644 --- a/docs/en/connector-v2/source/pulsar.md +++ b/docs/en/connector-v2/source/pulsar.md @@ -163,4 +163,5 @@ source { ### 2.3.0-beta 2022-10-20 - Add Pulsar Source Connector +### Next Version - Improve pulsar deserialization([3990](https://github.com/apache/incubator-seatunnel/pull/3990)) From a4744752968b52f5ca87050f8974a83ae80c5034 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Thu, 16 Feb 2023 18:53:31 +0800 Subject: [PATCH 04/12] add pulsar connector e2e --- .../connector-pulsar-e2e/pom.xml | 112 ++++++++++++ .../e2e/connector/pulsar/PulsarIT.java | 172 ++++++++++++++++++ .../pulsarsource_text_to_console.conf | 76 ++++++++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 4 files changed, 361 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsarsource_text_to_console.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml new file mode 100644 index 00000000000..86b827fafc7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml @@ -0,0 +1,112 @@ + + + + + seatunnel-connector-v2-e2e + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-pulsar-e2e + + + 11 + 11 + 2.8.0 + + + + + org.apache.seatunnel + seatunnel-e2e-common + ${project.version} + + + org.apache.seatunnel + connector-pulsar + ${project.version} + test + + + org.apache.seatunnel + seatunnel-format-json + ${project.version} + + + + org.apache.seatunnel + seatunnel-format-text + ${project.version} + + + + org.apache.seatunnel + connector-common + ${project.version} + + + org.apache.pulsar + pulsar-client-all + ${pulsar.version} + + + org.apache.pulsar + pulsar-package-core + + + + + org.apache.seatunnel + connector-console + ${project.version} + test + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + + org.testcontainers + testcontainers + 1.17.6 + test + + + junit + junit + 4.12 + compile + + + org.assertj + assertj-core + 3.23.1 + test + + + org.testcontainers + pulsar + 1.17.6 + test + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java new file mode 100644 index 00000000000..d1264b05f00 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.pulsar; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.seatunnel.api.table.type.*; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.format.text.TextSerializationSchema; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.Test; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; + +import java.io.IOException; +import java.time.Duration; +import java.util.*; + +import static java.time.temporal.ChronoUnit.SECONDS; + +/** + * Start / stop a Pulsar cluster. + */ +@Slf4j +public class PulsarIT extends TestSuiteBase implements TestResource { + + protected static PulsarContainer pulsarService; + + protected static String serviceUrl; + + protected static String adminUrl; + + protected static String zkUrl; + + protected static ClientConfigurationData clientConfigurationData = new ClientConfigurationData(); + + protected static ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData<>(); + + protected static PulsarAdmin pulsarAdmin; + + protected static PulsarClient pulsarClient; + + public static final String TOPIC = "persistent://public/default/test"; + + @BeforeAll + @Override + public void startUp() throws Exception { + + log.info("Starting PulsarService "); + pulsarService = new PulsarContainer(); + pulsarService.addExposedPort(2181); + pulsarService.waitingFor(new HttpWaitStrategy() + .forPort(PulsarContainer.BROKER_HTTP_PORT) + .forStatusCode(200) + .forPath("/admin/v2/namespaces/public/default") + .withStartupTimeout(Duration.of(400, SECONDS))); + pulsarService.start(); + pulsarService.followOutput(new Slf4jLogConsumer(log)); + serviceUrl = pulsarService.getPulsarBrokerUrl(); + adminUrl = pulsarService.getHttpServiceUrl(); + zkUrl = "localhost:" + pulsarService.getMappedPort(2181); + clientConfigurationData.setServiceUrl(serviceUrl); + consumerConfigurationData.setSubscriptionMode(SubscriptionMode.NonDurable); + consumerConfigurationData.setSubscriptionType(SubscriptionType.Exclusive); + consumerConfigurationData.setSubscriptionName("flink-" + UUID.randomUUID()); + log.info("serviceUrl:{}",serviceUrl); + log.info("adminUrl:{}",adminUrl); + log.info("zkUrl:{}",zkUrl); + + ClientBuilder builder = PulsarClient.builder(); + builder.serviceUrl(serviceUrl); + pulsarClient = builder.build(); + + TextSerializationSchema serializer = TextSerializationSchema.builder() + .seaTunnelRowType(SEATUNNEL_ROW_TYPE) + .delimiter(",") + .build(); + generateTestData(row -> new String(serializer.serialize(row)), 0, 2); + + log.info("Successfully started PulsarService"); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + log.info("-------------------------------------------------------------------------"); + log.info(" Shut down PulsarService "); + log.info("-------------------------------------------------------------------------"); + + if (pulsarService != null) { + pulsarService.stop(); + } + if (pulsarAdmin != null) { + pulsarAdmin.close(); + } + + log.info("-------------------------------------------------------------------------"); + log.info(" PulsarService finished"); + log.info("-------------------------------------------------------------------------"); + } + + @TestTemplate + public void testSourcePulsarTextToConsole(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/pulsarsource_text_to_console.conf"); + log.info("execResult.getExitCode:{}",execResult.getExitCode()); + log.info("execResult.getStdout:{}",execResult.getStdout()); + log.info("execResult.getStderr:{}",execResult.getStderr()); + + + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + + private void generateTestData(ProducerRecordConverter converter, int start, int end) throws PulsarClientException { + try ( + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(TOPIC) + .create(); + ) { + for (int i = start; i < end; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + "pulsarsource-test" + i + }); + String producerRecord = converter.convert(row); + producer.send(producerRecord); + } + } + + } + + interface ProducerRecordConverter { + String convert(SeaTunnelRow row); + } + + private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = new SeaTunnelRowType( + new String[]{ + "id", + "c_string" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + BasicType.STRING_TYPE + } + ); +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsarsource_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsarsource_text_to_console.conf new file mode 100644 index 00000000000..e3d104f2109 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsarsource_text_to_console.conf @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + job.mode = "STREAMING" +} + +source { + Pulsar { + topic = "persistent://public/default/test" + topic-pattern = "" + subscription.name = "seatunnel_pulsar_sub" + client.service-url = "pulsar://localhost:6650" + admin.service-url = "http://localhost:8080" + result_table_name = "pulsar_table" + schema = { + fields { + id = bigint + c_string = string + } + } + format = text + # The default field delimiter is "," + field_delimiter = "," + } +} + +transform { +} + +sink { + Console {} + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 68afc709fe9..a6db144dd18 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -48,6 +48,7 @@ connector-tdengine-e2e connector-datahub-e2e connector-mongodb-e2e + connector-pulsar-e2e seatunnel-connector-v2-e2e From fc3248f5a756dc25a6e6d6b9ea6972f00700c63e Mon Sep 17 00:00:00 2001 From: lightzhao Date: Fri, 17 Feb 2023 10:26:38 +0800 Subject: [PATCH 05/12] fix code style. --- .../connector-pulsar-e2e/pom.xml | 1 - .../e2e/connector/pulsar/PulsarIT.java | 83 +++++++++---------- 2 files changed, 40 insertions(+), 44 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml index 86b827fafc7..d40f647c9cc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml @@ -108,5 +108,4 @@ test - \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java index d1264b05f00..b7debd02a65 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java @@ -17,19 +17,30 @@ package org.apache.seatunnel.e2e.connector.pulsar; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.seatunnel.api.table.type.*; +import static java.time.temporal.ChronoUnit.SECONDS; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.format.text.TextSerializationSchema; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.Test; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; @@ -39,9 +50,7 @@ import java.io.IOException; import java.time.Duration; -import java.util.*; - -import static java.time.temporal.ChronoUnit.SECONDS; +import java.util.UUID; /** * Start / stop a Pulsar cluster. @@ -49,23 +58,23 @@ @Slf4j public class PulsarIT extends TestSuiteBase implements TestResource { - protected static PulsarContainer pulsarService; + private PulsarContainer pulsarService; - protected static String serviceUrl; + private String serviceUrl; - protected static String adminUrl; + private String adminUrl; - protected static String zkUrl; + private String zkUrl; - protected static ClientConfigurationData clientConfigurationData = new ClientConfigurationData(); + private ClientConfigurationData clientConfigurationData = new ClientConfigurationData(); - protected static ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData<>(); + private ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData<>(); - protected static PulsarAdmin pulsarAdmin; + private PulsarAdmin pulsarAdmin; - protected static PulsarClient pulsarClient; + private PulsarClient pulsarClient; - public static final String TOPIC = "persistent://public/default/test"; + private static final String TOPIC = "persistent://public/default/test"; @BeforeAll @Override @@ -88,16 +97,16 @@ public void startUp() throws Exception { consumerConfigurationData.setSubscriptionMode(SubscriptionMode.NonDurable); consumerConfigurationData.setSubscriptionType(SubscriptionType.Exclusive); consumerConfigurationData.setSubscriptionName("flink-" + UUID.randomUUID()); - log.info("serviceUrl:{}",serviceUrl); - log.info("adminUrl:{}",adminUrl); - log.info("zkUrl:{}",zkUrl); + log.info("serviceUrl:{}", serviceUrl); + log.info("adminUrl:{}", adminUrl); + log.info("zkUrl:{}", zkUrl); ClientBuilder builder = PulsarClient.builder(); builder.serviceUrl(serviceUrl); pulsarClient = builder.build(); TextSerializationSchema serializer = TextSerializationSchema.builder() - .seaTunnelRowType(SEATUNNEL_ROW_TYPE) + .seaTunnelRowType(seaTunnelRowType) .delimiter(",") .build(); generateTestData(row -> new String(serializer.serialize(row)), 0, 2); @@ -127,15 +136,13 @@ public void tearDown() throws Exception { @TestTemplate public void testSourcePulsarTextToConsole(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/pulsarsource_text_to_console.conf"); - log.info("execResult.getExitCode:{}",execResult.getExitCode()); - log.info("execResult.getStdout:{}",execResult.getStdout()); - log.info("execResult.getStderr:{}",execResult.getStderr()); - + log.info("execResult.getExitCode:{}", execResult.getExitCode()); + log.info("execResult.getStdout:{}", execResult.getStdout()); + log.info("execResult.getStderr:{}", execResult.getStderr()); Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } - private void generateTestData(ProducerRecordConverter converter, int start, int end) throws PulsarClientException { try ( Producer producer = pulsarClient.newProducer(Schema.STRING) @@ -143,30 +150,20 @@ private void generateTestData(ProducerRecordConverter converter, int start, int .create(); ) { for (int i = start; i < end; i++) { - SeaTunnelRow row = new SeaTunnelRow( - new Object[]{ - Long.valueOf(i), - "pulsarsource-test" + i - }); + SeaTunnelRow row = new SeaTunnelRow(new Object[]{Long.valueOf(i), "pulsarsource-test" + i}); String producerRecord = converter.convert(row); producer.send(producerRecord); } } - } interface ProducerRecordConverter { String convert(SeaTunnelRow row); } - private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = new SeaTunnelRowType( - new String[]{ - "id", - "c_string" - }, - new SeaTunnelDataType[]{ - BasicType.LONG_TYPE, - BasicType.STRING_TYPE - } + @SuppressWarnings("checkstyle:InnerTypeLast") + SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType( + new String[]{"id", "c_string"}, + new SeaTunnelDataType[]{BasicType.LONG_TYPE, BasicType.STRING_TYPE} ); } From c330c00ab1954075184daf3ecdfab9bb11edc6f7 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Fri, 17 Feb 2023 10:36:18 +0800 Subject: [PATCH 06/12] fix code style. --- docs/en/connector-v2/source/pulsar.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/pulsar.md index 3cbadfd5da1..a67c9ebd287 100644 --- a/docs/en/connector-v2/source/pulsar.md +++ b/docs/en/connector-v2/source/pulsar.md @@ -36,8 +36,8 @@ Source connector for Apache Pulsar. | cursor.stop.mode | Enum | No | NEVER | | cursor.stop.timestamp | Long | No | - | | schema | config | No | - | -| format | String | no | json | | -| field_delimiter | String | no | , | | +| format | String | no | json | +| field_delimiter | String | no | , | | common-options | | no | - | ### topic [String] From b4824dc7c7ecbe06611956f6e07e2d82886abddc Mon Sep 17 00:00:00 2001 From: lightzhao Date: Fri, 17 Feb 2023 10:47:15 +0800 Subject: [PATCH 07/12] fix code style. --- docs/en/connector-v2/source/pulsar.md | 3 + .../pulsar/config/SourceProperties.java | 40 +-- .../seatunnel/pulsar/source/PulsarSource.java | 315 +++++++++++------- .../connector-pulsar-e2e/pom.xml | 9 +- .../e2e/connector/pulsar/PulsarIT.java | 62 ++-- 5 files changed, 259 insertions(+), 170 deletions(-) diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/pulsar.md index a67c9ebd287..8a7322a6361 100644 --- a/docs/en/connector-v2/source/pulsar.md +++ b/docs/en/connector-v2/source/pulsar.md @@ -164,5 +164,8 @@ source { ### 2.3.0-beta 2022-10-20 - Add Pulsar Source Connector + ### Next Version + - Improve pulsar deserialization([3990](https://github.com/apache/incubator-seatunnel/pull/3990)) + diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java index 69904abcc86..225366ab251 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java @@ -26,16 +26,12 @@ public class SourceProperties { private static final Integer DEFAULT_POLL_TIMEOUT = 100; private static final Long DEFAULT_POLL_INTERVAL = 50L; private static final Integer DEFAULT_POLL_BATCH_SIZE = 500; - /** - * The default data format is JSON - */ + /** The default data format is JSON */ public static final String DEFAULT_FORMAT = "json"; public static final String TEXT_FORMAT = "text"; - /** - * The default field delimiter is “,” - */ + /** The default field delimiter is “,” */ public static final String DEFAULT_FIELD_DELIMITER = ","; // -------------------------------------------------------------------------------------------- @@ -174,20 +170,24 @@ public class SourceProperties { "Stop mode for Pulsar consumer, valid values are 'NEVER', 'LATEST' and 'TIMESTAMP'. Note, When 'NEVER' is specified, it is a real-time job, and other mode are off-line jobs."); public static final Option CURSOR_STOP_TIMESTAMP = - Options.key("cursor.stop.timestamp") - .longType() - .noDefaultValue() - .withDescription("Stop from the specified epoch timestamp (in milliseconds)"); - public static final Option FORMAT = Options.key("format") - .stringType() - .noDefaultValue() - .withDescription("Data format. The default format is json. Optional text format. The default field separator is \", \". " + - "If you customize the delimiter, add the \"field_delimiter\" option."); - - public static final Option FIELD_DELIMITER = Options.key("field_delimiter") - .stringType() - .noDefaultValue() - .withDescription("Customize the field delimiter for data format."); + Options.key("cursor.stop.timestamp") + .longType() + .noDefaultValue() + .withDescription("Stop from the specified epoch timestamp (in milliseconds)"); + public static final Option FORMAT = + Options.key("format") + .stringType() + .noDefaultValue() + .withDescription( + "Data format. The default format is json. Optional text format. The default field separator is \", \". " + + "If you customize the delimiter, add the \"field_delimiter\" option."); + + public static final Option FIELD_DELIMITER = + Options.key("field_delimiter") + .stringType() + .noDefaultValue() + .withDescription("Customize the field delimiter for data format."); + public enum StartMode { /** Start from the earliest cursor possible. */ EARLIEST, diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java index 2c6ba90d65c..19f5c5998dd 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java @@ -17,30 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.pulsar.source; -import static org.apache.seatunnel.common.PropertiesUtil.getEnum; -import static org.apache.seatunnel.common.PropertiesUtil.setOption; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_RESET_MODE; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_MODE; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.DEFAULT_FIELD_DELIMITER; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.DEFAULT_FORMAT; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FIELD_DELIMITER; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FORMAT; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TEXT_FORMAT; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN; +import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; @@ -78,17 +55,42 @@ import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; import org.apache.seatunnel.format.text.TextDeserializationSchema; -import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; import com.google.auto.service.AutoService; -import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; import java.util.Arrays; import java.util.regex.Pattern; +import static org.apache.seatunnel.common.PropertiesUtil.getEnum; +import static org.apache.seatunnel.common.PropertiesUtil.setOption; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_RESET_MODE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_MODE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.DEFAULT_FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.DEFAULT_FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TEXT_FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN; + @AutoService(SeaTunnelSource.class) -public class PulsarSource implements SeaTunnelSource, - SupportParallelism { +public class PulsarSource + implements SeaTunnelSource, + SupportParallelism { private DeserializationSchema deserialization; private PulsarAdminConfig adminConfig; @@ -111,51 +113,73 @@ public String getPluginName() { @SuppressWarnings("checkstyle:MagicNumber") @Override public void prepare(Config config) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(config, SUBSCRIPTION_NAME.key(), CLIENT_SERVICE_URL.key(), ADMIN_SERVICE_URL.key()); + CheckResult result = + CheckConfigUtil.checkAllExists( + config, + SUBSCRIPTION_NAME.key(), + CLIENT_SERVICE_URL.key(), + ADMIN_SERVICE_URL.key()); if (!result.isSuccess()) { - throw new PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg())); + throw new PulsarConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, result.getMsg())); } // admin config - PulsarAdminConfig.Builder adminConfigBuilder = PulsarAdminConfig.builder() - .adminUrl(config.getString(ADMIN_SERVICE_URL.key())); - setOption(config, AUTH_PLUGIN_CLASS.key(), config::getString, adminConfigBuilder::authPluginClassName); + PulsarAdminConfig.Builder adminConfigBuilder = + PulsarAdminConfig.builder().adminUrl(config.getString(ADMIN_SERVICE_URL.key())); + setOption( + config, + AUTH_PLUGIN_CLASS.key(), + config::getString, + adminConfigBuilder::authPluginClassName); setOption(config, AUTH_PARAMS.key(), config::getString, adminConfigBuilder::authParams); this.adminConfig = adminConfigBuilder.build(); // client config - PulsarClientConfig.Builder clientConfigBuilder = PulsarClientConfig.builder() - .serviceUrl(config.getString(CLIENT_SERVICE_URL.key())); - setOption(config, AUTH_PLUGIN_CLASS.key(), config::getString, clientConfigBuilder::authPluginClassName); + PulsarClientConfig.Builder clientConfigBuilder = + PulsarClientConfig.builder().serviceUrl(config.getString(CLIENT_SERVICE_URL.key())); + setOption( + config, + AUTH_PLUGIN_CLASS.key(), + config::getString, + clientConfigBuilder::authPluginClassName); setOption(config, AUTH_PARAMS.key(), config::getString, clientConfigBuilder::authParams); this.clientConfig = clientConfigBuilder.build(); // consumer config - PulsarConsumerConfig.Builder consumerConfigBuilder = PulsarConsumerConfig.builder() - .subscriptionName(config.getString(SUBSCRIPTION_NAME.key())); + PulsarConsumerConfig.Builder consumerConfigBuilder = + PulsarConsumerConfig.builder() + .subscriptionName(config.getString(SUBSCRIPTION_NAME.key())); this.consumerConfig = consumerConfigBuilder.build(); // source properties - setOption(config, - TOPIC_DISCOVERY_INTERVAL.key(), - TOPIC_DISCOVERY_INTERVAL.defaultValue(), - config::getLong, - v -> this.partitionDiscoveryIntervalMs = v); - setOption(config, - POLL_TIMEOUT.key(), - POLL_TIMEOUT.defaultValue(), - config::getInt, - v -> this.pollTimeout = v); - setOption(config, - POLL_INTERVAL.key(), - POLL_INTERVAL.defaultValue(), - config::getLong, - v -> this.pollInterval = v); - setOption(config, - POLL_BATCH_SIZE.key(), - POLL_BATCH_SIZE.defaultValue(), - config::getInt, - v -> this.batchSize = v); + setOption( + config, + TOPIC_DISCOVERY_INTERVAL.key(), + TOPIC_DISCOVERY_INTERVAL.defaultValue(), + config::getLong, + v -> this.partitionDiscoveryIntervalMs = v); + setOption( + config, + POLL_TIMEOUT.key(), + POLL_TIMEOUT.defaultValue(), + config::getInt, + v -> this.pollTimeout = v); + setOption( + config, + POLL_INTERVAL.key(), + POLL_INTERVAL.defaultValue(), + config::getLong, + v -> this.pollInterval = v); + setOption( + config, + POLL_BATCH_SIZE.key(), + POLL_BATCH_SIZE.defaultValue(), + config::getInt, + v -> this.batchSize = v); setStartCursor(config); setStopCursor(config); @@ -163,14 +187,21 @@ public void prepare(Config config) throws PrepareFailException { setDeserialization(config); if (partitionDiscoverer instanceof TopicPatternDiscoverer - && partitionDiscoveryIntervalMs > 0 - && Boundedness.BOUNDED == stopCursor.getBoundedness()) { - throw new PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "Bounded streams do not support dynamic partition discovery."); + && partitionDiscoveryIntervalMs > 0 + && Boundedness.BOUNDED == stopCursor.getBoundedness()) { + throw new PulsarConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + "Bounded streams do not support dynamic partition discovery."); } } private void setStartCursor(Config config) { - StartMode startMode = getEnum(config, CURSOR_STARTUP_MODE.key(), StartMode.class, CURSOR_STARTUP_MODE.defaultValue()); + StartMode startMode = + getEnum( + config, + CURSOR_STARTUP_MODE.key(), + StartMode.class, + CURSOR_STARTUP_MODE.defaultValue()); switch (startMode) { case EARLIEST: this.startCursor = StartCursor.earliest(); @@ -179,25 +210,42 @@ private void setStartCursor(Config config) { this.startCursor = StartCursor.latest(); break; case SUBSCRIPTION: - SubscriptionStartCursor.CursorResetStrategy resetStrategy = getEnum(config, - CURSOR_RESET_MODE.key(), - SubscriptionStartCursor.CursorResetStrategy.class, - SubscriptionStartCursor.CursorResetStrategy.LATEST); + SubscriptionStartCursor.CursorResetStrategy resetStrategy = + getEnum( + config, + CURSOR_RESET_MODE.key(), + SubscriptionStartCursor.CursorResetStrategy.class, + SubscriptionStartCursor.CursorResetStrategy.LATEST); this.startCursor = StartCursor.subscription(resetStrategy); break; case TIMESTAMP: if (StringUtils.isBlank(config.getString(CURSOR_STARTUP_TIMESTAMP.key()))) { - throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STARTUP_TIMESTAMP.key(), CURSOR_STARTUP_MODE.key())); + throw new PulsarConnectorException( + SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, + String.format( + "The '%s' property is required when the '%s' is 'timestamp'.", + CURSOR_STARTUP_TIMESTAMP.key(), CURSOR_STARTUP_MODE.key())); } - setOption(config, CURSOR_STARTUP_TIMESTAMP.key(), config::getLong, timestamp -> this.startCursor = StartCursor.timestamp(timestamp)); + setOption( + config, + CURSOR_STARTUP_TIMESTAMP.key(), + config::getLong, + timestamp -> this.startCursor = StartCursor.timestamp(timestamp)); break; default: - throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The %s mode is not supported.", startMode)); + throw new PulsarConnectorException( + SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, + String.format("The %s mode is not supported.", startMode)); } } private void setStopCursor(Config config) { - SourceProperties.StopMode stopMode = getEnum(config, CURSOR_STOP_MODE.key(), SourceProperties.StopMode.class, CURSOR_STOP_MODE.defaultValue()); + SourceProperties.StopMode stopMode = + getEnum( + config, + CURSOR_STOP_MODE.key(), + SourceProperties.StopMode.class, + CURSOR_STOP_MODE.defaultValue()); switch (stopMode) { case LATEST: this.stopCursor = StopCursor.latest(); @@ -207,29 +255,48 @@ private void setStopCursor(Config config) { break; case TIMESTAMP: if (StringUtils.isBlank(config.getString(CURSOR_STOP_TIMESTAMP.key()))) { - throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STOP_TIMESTAMP.key(), CURSOR_STOP_MODE.key())); + throw new PulsarConnectorException( + SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, + String.format( + "The '%s' property is required when the '%s' is 'timestamp'.", + CURSOR_STOP_TIMESTAMP.key(), CURSOR_STOP_MODE.key())); } - setOption(config, CURSOR_STARTUP_TIMESTAMP.key(), config::getLong, timestamp -> this.stopCursor = StopCursor.timestamp(timestamp)); + setOption( + config, + CURSOR_STARTUP_TIMESTAMP.key(), + config::getLong, + timestamp -> this.stopCursor = StopCursor.timestamp(timestamp)); break; default: - throw new PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("The %s mode is not supported.", stopMode)); + throw new PulsarConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("The %s mode is not supported.", stopMode)); } } private void setPartitionDiscoverer(Config config) { String topic = config.getString(TOPIC.key()); if (StringUtils.isNotBlank(topic)) { - this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ","))); + this.partitionDiscoverer = + new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ","))); } String topicPattern = config.getString(TOPIC_PATTERN.key()); if (StringUtils.isNotBlank(topicPattern)) { if (this.partitionDiscoverer != null) { - throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key())); + throw new PulsarConnectorException( + SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, + String.format( + "The properties '%s' and '%s' is exclusive.", + TOPIC.key(), TOPIC_PATTERN.key())); } this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(topicPattern)); } if (this.partitionDiscoverer == null) { - throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' or '%s' is required.", TOPIC.key(), TOPIC_PATTERN.key())); + throw new PulsarConnectorException( + SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, + String.format( + "The properties '%s' or '%s' is required.", + TOPIC.key(), TOPIC_PATTERN.key())); } } @@ -237,39 +304,48 @@ private void setDeserialization(Config config) { String schemaKey = SeaTunnelSchema.SCHEMA.key(); if (config.hasPath(schemaKey)) { Config schema = config.getConfig(schemaKey); - SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType(); + SeaTunnelRowType rowType = + SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType(); String format = DEFAULT_FORMAT; if (config.hasPath(FORMAT.key())) { format = config.getString(FORMAT.key()); } if (DEFAULT_FORMAT.equals(format)) { - deserialization = (DeserializationSchema) new JsonDeserializationSchema(false, false, rowType); + deserialization = + (DeserializationSchema) + new JsonDeserializationSchema(false, false, rowType); } else if (TEXT_FORMAT.equals(format)) { String delimiter = DEFAULT_FIELD_DELIMITER; if (config.hasPath(FIELD_DELIMITER.key())) { delimiter = config.getString(FIELD_DELIMITER.key()); } - deserialization = (DeserializationSchema) TextDeserializationSchema.builder() - .seaTunnelRowType(rowType) - .delimiter(delimiter) - .build(); + deserialization = + (DeserializationSchema) + TextDeserializationSchema.builder() + .seaTunnelRowType(rowType) + .delimiter(delimiter) + .build(); } else { // TODO: use format SPI - throw new SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_OPERATION, - "Unsupported format: " + format); + throw new SeaTunnelJsonFormatException( + CommonErrorCode.UNSUPPORTED_OPERATION, "Unsupported format: " + format); } } else { SeaTunnelRowType rowType = SeaTunnelSchema.buildSimpleTextSchema(); - deserialization = (DeserializationSchema) TextDeserializationSchema.builder() - .seaTunnelRowType(rowType) - .delimiter(String.valueOf('\002')) - .build(); + deserialization = + (DeserializationSchema) + TextDeserializationSchema.builder() + .seaTunnelRowType(rowType) + .delimiter(String.valueOf('\002')) + .build(); } } @Override public Boundedness getBoundedness() { - return this.stopCursor instanceof NeverStopCursor ? Boundedness.UNBOUNDED : Boundedness.BOUNDED; + return this.stopCursor instanceof NeverStopCursor + ? Boundedness.UNBOUNDED + : Boundedness.BOUNDED; } @Override @@ -278,40 +354,47 @@ public SeaTunnelDataType getProducedType() { } @Override - public SourceReader createReader(SourceReader.Context readerContext) throws Exception { - return new PulsarSourceReader<>(readerContext, - clientConfig, - consumerConfig, - startCursor, - deserialization, - pollTimeout, - pollInterval, - batchSize); + public SourceReader createReader(SourceReader.Context readerContext) + throws Exception { + return new PulsarSourceReader<>( + readerContext, + clientConfig, + consumerConfig, + startCursor, + deserialization, + pollTimeout, + pollInterval, + batchSize); } @Override - public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { + public SourceSplitEnumerator createEnumerator( + SourceSplitEnumerator.Context enumeratorContext) + throws Exception { return new PulsarSplitEnumerator( - enumeratorContext, - adminConfig, - partitionDiscoverer, - partitionDiscoveryIntervalMs, - startCursor, - stopCursor, - consumerConfig.getSubscriptionName()); + enumeratorContext, + adminConfig, + partitionDiscoverer, + partitionDiscoveryIntervalMs, + startCursor, + stopCursor, + consumerConfig.getSubscriptionName()); } @Override - public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, PulsarSplitEnumeratorState checkpointState) throws Exception { + public SourceSplitEnumerator + restoreEnumerator( + SourceSplitEnumerator.Context enumeratorContext, + PulsarSplitEnumeratorState checkpointState) + throws Exception { return new PulsarSplitEnumerator( - enumeratorContext, - adminConfig, - partitionDiscoverer, - partitionDiscoveryIntervalMs, - startCursor, - stopCursor, - consumerConfig.getSubscriptionName(), - checkpointState.assignedPartitions()); + enumeratorContext, + adminConfig, + partitionDiscoverer, + partitionDiscoveryIntervalMs, + startCursor, + stopCursor, + consumerConfig.getSubscriptionName(), + checkpointState.assignedPartitions()); } - } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml index d40f647c9cc..4a58c3bda74 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml @@ -13,15 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. --> - + 4.0.0 - seatunnel-connector-v2-e2e org.apache.seatunnel + seatunnel-connector-v2-e2e ${revision} - 4.0.0 connector-pulsar-e2e @@ -108,4 +107,4 @@ test - \ No newline at end of file + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java index b7debd02a65..358dd5956ef 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.e2e.connector.pulsar; -import static java.time.temporal.ChronoUnit.SECONDS; - import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -28,7 +26,6 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.format.text.TextSerializationSchema; -import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Producer; @@ -39,6 +36,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -48,13 +46,15 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.time.Duration; import java.util.UUID; -/** - * Start / stop a Pulsar cluster. - */ +import static java.time.temporal.ChronoUnit.SECONDS; + +/** Start / stop a Pulsar cluster. */ @Slf4j public class PulsarIT extends TestSuiteBase implements TestResource { @@ -68,7 +68,8 @@ public class PulsarIT extends TestSuiteBase implements TestResource { private ClientConfigurationData clientConfigurationData = new ClientConfigurationData(); - private ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData<>(); + private ConsumerConfigurationData consumerConfigurationData = + new ConsumerConfigurationData<>(); private PulsarAdmin pulsarAdmin; @@ -83,11 +84,12 @@ public void startUp() throws Exception { log.info("Starting PulsarService "); pulsarService = new PulsarContainer(); pulsarService.addExposedPort(2181); - pulsarService.waitingFor(new HttpWaitStrategy() - .forPort(PulsarContainer.BROKER_HTTP_PORT) - .forStatusCode(200) - .forPath("/admin/v2/namespaces/public/default") - .withStartupTimeout(Duration.of(400, SECONDS))); + pulsarService.waitingFor( + new HttpWaitStrategy() + .forPort(PulsarContainer.BROKER_HTTP_PORT) + .forStatusCode(200) + .forPath("/admin/v2/namespaces/public/default") + .withStartupTimeout(Duration.of(400, SECONDS))); pulsarService.start(); pulsarService.followOutput(new Slf4jLogConsumer(log)); serviceUrl = pulsarService.getPulsarBrokerUrl(); @@ -105,10 +107,11 @@ public void startUp() throws Exception { builder.serviceUrl(serviceUrl); pulsarClient = builder.build(); - TextSerializationSchema serializer = TextSerializationSchema.builder() - .seaTunnelRowType(seaTunnelRowType) - .delimiter(",") - .build(); + TextSerializationSchema serializer = + TextSerializationSchema.builder() + .seaTunnelRowType(seaTunnelRowType) + .delimiter(",") + .build(); generateTestData(row -> new String(serializer.serialize(row)), 0, 2); log.info("Successfully started PulsarService"); @@ -134,8 +137,10 @@ public void tearDown() throws Exception { } @TestTemplate - public void testSourcePulsarTextToConsole(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/pulsarsource_text_to_console.conf"); + public void testSourcePulsarTextToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/pulsarsource_text_to_console.conf"); log.info("execResult.getExitCode:{}", execResult.getExitCode()); log.info("execResult.getStdout:{}", execResult.getStdout()); log.info("execResult.getStderr:{}", execResult.getStderr()); @@ -143,14 +148,13 @@ public void testSourcePulsarTextToConsole(TestContainer container) throws IOExce Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } - private void generateTestData(ProducerRecordConverter converter, int start, int end) throws PulsarClientException { - try ( - Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic(TOPIC) - .create(); - ) { + private void generateTestData(ProducerRecordConverter converter, int start, int end) + throws PulsarClientException { + try (Producer producer = + pulsarClient.newProducer(Schema.STRING).topic(TOPIC).create(); ) { for (int i = start; i < end; i++) { - SeaTunnelRow row = new SeaTunnelRow(new Object[]{Long.valueOf(i), "pulsarsource-test" + i}); + SeaTunnelRow row = + new SeaTunnelRow(new Object[] {Long.valueOf(i), "pulsarsource-test" + i}); String producerRecord = converter.convert(row); producer.send(producerRecord); } @@ -162,8 +166,8 @@ interface ProducerRecordConverter { } @SuppressWarnings("checkstyle:InnerTypeLast") - SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType( - new String[]{"id", "c_string"}, - new SeaTunnelDataType[]{BasicType.LONG_TYPE, BasicType.STRING_TYPE} - ); + SeaTunnelRowType seaTunnelRowType = + new SeaTunnelRowType( + new String[] {"id", "c_string"}, + new SeaTunnelDataType[] {BasicType.LONG_TYPE, BasicType.STRING_TYPE}); } From 9a3ffac5834869a8f7ce7df6f877792fe633b439 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Fri, 17 Mar 2023 11:07:23 +0800 Subject: [PATCH 08/12] handle conflict --- seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index a0e839d7733..d38c468e77b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -53,6 +53,7 @@ connector-mongodb-e2e connector-hbase-e2e connector-maxcompute-e2e + connector-pulsar-e2e From 3e5c710ed61f594a98bf85f3e32c8c9ef918adeb Mon Sep 17 00:00:00 2001 From: lightzhao Date: Tue, 28 Mar 2023 09:06:27 +0800 Subject: [PATCH 09/12] update pom. --- .../seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml index 4a58c3bda74..f6a0068e4fd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml @@ -25,8 +25,6 @@ connector-pulsar-e2e - 11 - 11 2.8.0 From 1f490adf23473d3e861ac99d26f24709e034df29 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Wed, 29 Mar 2023 14:41:55 +0800 Subject: [PATCH 10/12] update. --- .../connector-pulsar-e2e/pom.xml | 6 +++++ .../e2e/connector/pulsar/PulsarIT.java | 2 -- .../pulsarsource_text_to_console.conf | 25 ------------------- 3 files changed, 6 insertions(+), 27 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml index f6a0068e4fd..922efac1783 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml @@ -104,5 +104,11 @@ 1.17.6 test + + org.slf4j + jul-to-slf4j + ${slf4j.version} + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java index 358dd5956ef..e80f04e8164 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java @@ -144,8 +144,6 @@ public void testSourcePulsarTextToConsole(TestContainer container) log.info("execResult.getExitCode:{}", execResult.getExitCode()); log.info("execResult.getStdout:{}", execResult.getStdout()); log.info("execResult.getStderr:{}", execResult.getStderr()); - - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } private void generateTestData(ProducerRecordConverter converter, int start, int end) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsarsource_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsarsource_text_to_console.conf index e3d104f2109..4f8312aa48a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsarsource_text_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsarsource_text_to_console.conf @@ -48,29 +48,4 @@ transform { sink { Console {} - Assert { - rules = - { - field_rules = [ - { - field_name = id - field_type = long - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 0 - }, - { - rule_type = MAX - rule_value = 99 - } - ] - } - ] - } - - } } \ No newline at end of file From 70e200918a7b493ad1d1e67358c93b4db14379c9 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Wed, 29 Mar 2023 14:59:38 +0800 Subject: [PATCH 11/12] fix codestyle. --- .../java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java index e80f04e8164..112cc72a79a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarIT.java @@ -38,7 +38,6 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; From ce42c752fa63cf3704e73e05e164a9c9f09e6f0e Mon Sep 17 00:00:00 2001 From: lightzhao Date: Tue, 4 Apr 2023 12:57:18 +0800 Subject: [PATCH 12/12] update pom. --- .../connector-pulsar-e2e/pom.xml | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml index 922efac1783..1a278e7f114 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml @@ -26,6 +26,7 @@ 2.8.0 + 1.17.6 @@ -80,18 +81,6 @@ ${project.version} test - - org.testcontainers - testcontainers - 1.17.6 - test - - - junit - junit - 4.12 - compile - org.assertj assertj-core @@ -101,7 +90,7 @@ org.testcontainers pulsar - 1.17.6 + ${testcontainers.version} test