From c0b3ad7e06926ea2a3ebb5dcee25e68f3437ca2c Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Thu, 21 Oct 2021 13:07:39 +0800 Subject: [PATCH 1/6] pulsar sql support query big entry data --- .../sql/presto/PulsarConnectorCache.java | 4 +- .../integration/presto/TestBasicPresto.java | 64 ++++++++++++------- .../presto/TestPrestoQueryTieredStorage.java | 8 --- .../integration/presto/TestPulsarSQLBase.java | 18 +++--- .../suites/PulsarSQLTestSuite.java | 46 +++++++++++++ .../integration/topologies/PulsarCluster.java | 6 +- .../topologies/PulsarClusterSpec.java | 3 + 7 files changed, 106 insertions(+), 43 deletions(-) diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index bf823de7229f9..9a64c055d07ac 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -42,6 +42,7 @@ import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -109,7 +110,8 @@ private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig puls .setReadEntryTimeout(60) .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue()) .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads()) - .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads()); + .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads()) + .setNettyMaxFrameSizeBytes(pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING); ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB()); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 91043904076be..08d4757343f29 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -27,7 +27,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.AvroSchema; @@ -35,6 +34,7 @@ import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; @@ -161,31 +161,26 @@ protected int prepareData(TopicName topicName, boolean useNsOffloadPolices, Schema schema, CompressionType compressionType) throws Exception { - @Cleanup - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) - .build(); if (schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName())) { - prepareDataForBytesSchema(pulsarClient, topicName, isBatch, compressionType); + prepareDataForBytesSchema(topicName, isBatch, compressionType); } else if (schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) { - prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch, compressionType); + prepareDataForByteBufferSchema(topicName, isBatch, compressionType); } else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING)) { - prepareDataForStringSchema(pulsarClient, topicName, isBatch, compressionType); + prepareDataForStringSchema(topicName, isBatch, compressionType); } else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON) || schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) { - prepareDataForStructSchema(pulsarClient, topicName, isBatch, schema, compressionType); + prepareDataForStructSchema(topicName, isBatch, schema, compressionType); } else if (schema.getSchemaInfo().getType().equals(SchemaType.PROTOBUF_NATIVE)) { - prepareDataForProtobufNativeSchema(pulsarClient, topicName, isBatch, schema, compressionType); + prepareDataForProtobufNativeSchema(topicName, isBatch, schema, compressionType); } else if (schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) { - prepareDataForKeyValueSchema(pulsarClient, topicName, schema, compressionType); + prepareDataForKeyValueSchema(topicName, schema, compressionType); } return NUM_OF_STOCKS; } - private void prepareDataForBytesSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForBytesSchema(TopicName topicName, boolean isBatch, CompressionType compressionType) throws PulsarClientException { @Cleanup @@ -201,8 +196,7 @@ private void prepareDataForBytesSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForByteBufferSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForByteBufferSchema(TopicName topicName, boolean isBatch, CompressionType compressionType) throws PulsarClientException { @Cleanup @@ -218,8 +212,7 @@ private void prepareDataForByteBufferSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForStringSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForStringSchema(TopicName topicName, boolean isBatch, CompressionType compressionType) throws PulsarClientException { @Cleanup @@ -235,8 +228,7 @@ private void prepareDataForStringSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForStructSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForStructSchema(TopicName topicName, boolean isBatch, Schema schema, CompressionType compressionType) throws Exception { @@ -254,8 +246,7 @@ private void prepareDataForStructSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForProtobufNativeSchema(TopicName topicName, boolean isBatch, Schema schema, CompressionType compressionType) throws Exception { @@ -274,8 +265,7 @@ private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForKeyValueSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForKeyValueSchema(TopicName topicName, Schema> schema, CompressionType compressionType) throws Exception { @Cleanup @@ -342,4 +332,32 @@ private void validateContentForKeyValueSchema(int messageNum, String[] contentAr } } + @Test + public void testQueueBigEntry() throws Exception { + String tableName = "big_data_" + randomName(5); + String topic = "persistent://public/default/" + tableName; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + int dataLength = pulsarCluster.getSpec().maxMessageSize() - 1024 * 1024; + // Make sure that the data length bigger than the default maxMessageSize + Assert.assertTrue(dataLength > + (Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING)); + byte[] data = new byte[dataLength]; + for (int i = 0; i < dataLength; i++) { + data[i] = 'a'; + } + + int messageCnt = 5; + for (int i = 0 ; i < messageCnt; ++i) { + producer.newMessage().value(data).send(); + } + + int count = selectCount("public/default", tableName);; + Assert.assertEquals(count, messageCnt); + } + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java index 881dbe4bd0d41..75373bed2ae26 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java @@ -29,7 +29,6 @@ import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -40,8 +39,6 @@ import org.apache.pulsar.tests.integration.containers.S3Container; import org.testcontainers.shaded.org.apache.commons.lang.StringUtils; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -136,11 +133,6 @@ protected int prepareData(TopicName topicName, Schema schema, CompressionType compressionType) throws Exception { @Cleanup - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) - .build(); - - @Cleanup Consumer consumer = pulsarClient.newConsumer(JSONSchema.of(Stock.class)) .topic(topicName.toString()) .subscriptionName("test") diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java index 026a32d1c8ec6..0626e3522e8c5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java @@ -21,8 +21,6 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.common.base.Stopwatch; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -196,9 +194,6 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th ); // test predicate pushdown - String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); - Connection connection = DriverManager.getConnection(url, "test", null); - String query = String.format("select * from pulsar" + ".\"%s\".\"%s\" order by __publish_time__", namespace, topic); log.info("Executing query: {}", query); @@ -267,11 +262,7 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size()); assertThat(returnedTimestamps.size()).isEqualTo(0); - query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, topic); - log.info("Executing query: {}", query); - res = connection.createStatement().executeQuery(query); - res.next(); - int count = res.getInt("_col0"); + int count = selectCount(namespace, topic); assertThat(count).isGreaterThan(messageNum - 2); } @@ -304,5 +295,12 @@ private static void printCurrent(ResultSet rs) throws SQLException { } + protected int selectCount(String namespace, String tableName) throws SQLException { + String query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, tableName); + log.info("Executing count query: {}", query); + ResultSet res = connection.createStatement().executeQuery(query); + res.next(); + return res.getInt("_col0"); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java index 9ed733570452f..3db1b2a327209 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java @@ -18,13 +18,22 @@ */ package org.apache.pulsar.tests.integration.suites; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.tests.integration.containers.BrokerContainer; import org.apache.pulsar.tests.integration.containers.S3Container; import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +/** + * Pulsar SQL test suite. + */ @Slf4j public abstract class PulsarSQLTestSuite extends PulsarTestSuite { @@ -33,11 +42,15 @@ public abstract class PulsarSQLTestSuite extends PulsarTestSuite { public static final String BUCKET = "pulsar-integtest"; public static final String ENDPOINT = "http://" + S3Container.NAME + ":9090"; + protected Connection connection = null; + protected PulsarClient pulsarClient = null; + @Override protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) { specBuilder.queryLastMessage(true); specBuilder.clusterName("pulsar-sql-test"); specBuilder.numBrokers(1); + specBuilder.maxMessageSize(2 * (Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING)); return super.beforeSetupCluster(clusterName, specBuilder); } @@ -55,4 +68,37 @@ protected void beforeStartCluster() throws Exception { } } + @Override + protected void setupCluster(PulsarClusterSpec spec) throws Exception { + super.setupCluster(spec); + String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); + connection = DriverManager.getConnection(url, "test", null); + + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + } + + @Override + public void tearDownCluster() throws Exception { + close(); + super.tearDownCluster(); + } + + protected void close() { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + log.error("Failed to close sql connection.", e); + } + } + if (pulsarClient != null) { + try { + pulsarClient.close(); + } catch (PulsarClientException e) { + log.error("Failed to close pulsar client.", e); + } + } + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 043762ce1f761..3feeed6b76a53 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -73,6 +73,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) { return new PulsarCluster(spec); } + @Getter private final PulsarClusterSpec spec; @Getter @@ -150,6 +151,7 @@ private PulsarCluster(PulsarClusterSpec spec) { .withEnv("journalMaxGroupWaitMSec", "0") .withEnv("clusterName", clusterName) .withEnv("diskUsageThreshold", "0.99") + .withEnv("nettyMaxFrameSizeBytes", "" + spec.maxMessageSize) ) ); @@ -166,7 +168,8 @@ private PulsarCluster(PulsarClusterSpec spec) { .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1") // used in s3 tests .withEnv("AWS_ACCESS_KEY_ID", "accesskey") - .withEnv("AWS_SECRET_KEY", "secretkey"); + .withEnv("AWS_SECRET_KEY", "secretkey") + .withEnv("maxMessageSize", "" + spec.maxMessageSize); if (spec.queryLastMessage) { brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10"); brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false"); @@ -420,6 +423,7 @@ private PrestoWorkerContainer buildPrestoWorkerContainer(String hostName, boolea .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) .withEnv("pulsar.web-service-url", "http://pulsar-broker-0:8080") + .withEnv("pulsar.max-message-size", "" + spec.maxMessageSize) .withClasspathResourceMapping( resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE); if (spec.queryLastMessage) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java index 9dcfcfb725e79..9a195d0359019 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java @@ -29,6 +29,7 @@ import lombok.Singular; import lombok.experimental.Accessors; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.tests.integration.containers.PulsarContainer; import org.testcontainers.containers.GenericContainer; @@ -152,4 +153,6 @@ public class PulsarClusterSpec { * Specify mount files. */ Map brokerMountFiles; + + int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; } From 72e6a069fd64294da060f4bdea33f5a9ddc18680 Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Thu, 21 Oct 2021 13:27:54 +0800 Subject: [PATCH 2/6] add doc --- conf/presto/catalog/pulsar.properties | 3 ++- site2/docs/sql-deployment-configurations.md | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties index 1f5a89a8b7cc7..e273b98dccc95 100644 --- a/conf/presto/catalog/pulsar.properties +++ b/conf/presto/catalog/pulsar.properties @@ -42,7 +42,8 @@ pulsar.max-split-queue-cache-size=-1 # to prevent erroneous rewriting pulsar.namespace-delimiter-rewrite-enable=false pulsar.rewrite-namespace-delimiter=/ - +# max size of one batch message (default value is 5MB) +# pulsar.max-message-size=5242880 ####### TIERED STORAGE OFFLOADER CONFIGS ####### diff --git a/site2/docs/sql-deployment-configurations.md b/site2/docs/sql-deployment-configurations.md index 1fe0353f07531..e5c402ebdc14d 100644 --- a/site2/docs/sql-deployment-configurations.md +++ b/site2/docs/sql-deployment-configurations.md @@ -24,6 +24,9 @@ pulsar.entry-read-batch-size=100 # default number of splits to use per query pulsar.target-num-splits=4 + +# max size of one batch message (default value is 5MB) +pulsar.max-message-size=5242880 ``` You can connect Presto to a Pulsar cluster with multiple hosts. To configure multiple hosts for brokers, add multiple URLs to `pulsar.web-service-url`. To configure multiple hosts for ZooKeeper, add multiple URIs to `pulsar.zookeeper-uri`. The following is an example. From 4a9ca106daebab2c9660e108f606e8d6553930b5 Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Thu, 21 Oct 2021 16:27:31 +0800 Subject: [PATCH 3/6] fix --- .../pulsar/tests/integration/topologies/PulsarClusterSpec.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java index 9a195d0359019..eed604205bdce 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java @@ -154,5 +154,6 @@ public class PulsarClusterSpec { */ Map brokerMountFiles; + @Default int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; } From 56a1d36ed30192e1759e92cfc860d120df27df8c Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Thu, 21 Oct 2021 18:13:44 +0800 Subject: [PATCH 4/6] fix --- .../integration/presto/TestBasicPresto.java | 3 ++- .../presto/TestPrestoQueryTieredStorage.java | 1 + .../integration/suites/PulsarSQLTestSuite.java | 16 +++++++++++----- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 08d4757343f29..a2a10baf63177 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -56,6 +56,7 @@ public class TestBasicPresto extends TestPulsarSQLBase { private void setupPresto() throws Exception { log.info("[TestBasicPresto] setupPresto..."); pulsarCluster.startPrestoWorker(); + initJdbcConnection(); } private void teardownPresto() { @@ -332,7 +333,7 @@ private void validateContentForKeyValueSchema(int messageNum, String[] contentAr } } - @Test + @Test(timeOut = 1000 * 30) public void testQueueBigEntry() throws Exception { String tableName = "big_data_" + randomName(5); String topic = "persistent://public/default/" + tableName; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java index 75373bed2ae26..4c129fadce002 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java @@ -86,6 +86,7 @@ private void setupExtraContainers() throws Exception { String offloadProperties = getOffloadProperties(BUCKET, null, ENDPOINT); pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, offloadProperties); pulsarCluster.startPrestoFollowWorkers(1, OFFLOAD_DRIVER, offloadProperties); + initJdbcConnection(); } private String getOffloadProperties(String bucket, String region, String endpoint) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java index 3db1b2a327209..b6ea07807ae1d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java @@ -69,16 +69,22 @@ protected void beforeStartCluster() throws Exception { } @Override - protected void setupCluster(PulsarClusterSpec spec) throws Exception { - super.setupCluster(spec); - String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); - connection = DriverManager.getConnection(url, "test", null); - + public void setupCluster() throws Exception { + super.setupCluster(); pulsarClient = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) .build(); } + protected void initJdbcConnection() throws SQLException { + if (pulsarCluster.getPrestoWorkerContainer() == null) { + log.error("The presto work container isn't exist."); + return; + } + String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); + connection = DriverManager.getConnection(url, "test", null); + } + @Override public void tearDownCluster() throws Exception { close(); From 0d7b4666c219a5697732dc34b7c67d4296c626ff Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Fri, 22 Oct 2021 01:29:51 +0800 Subject: [PATCH 5/6] fix --- .../pulsar/tests/integration/containers/BKContainer.java | 1 + .../pulsar/tests/integration/presto/TestBasicPresto.java | 7 ++++--- .../pulsar/tests/integration/topologies/PulsarCluster.java | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java index 36f17cd35cd0b..b294cac4e701e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java @@ -28,5 +28,6 @@ public class BKContainer extends PulsarContainer { public BKContainer(String clusterName, String hostName) { super( clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, INVALID_PORT); + tailContainerLog(); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index a2a10baf63177..7f7d5517dd17b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -341,23 +341,24 @@ public void testQueueBigEntry() throws Exception { @Cleanup Producer producer = pulsarClient.newProducer(Schema.BYTES) .topic(topic) + .enableBatching(false) .create(); int dataLength = pulsarCluster.getSpec().maxMessageSize() - 1024 * 1024; // Make sure that the data length bigger than the default maxMessageSize - Assert.assertTrue(dataLength > - (Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING)); + Assert.assertTrue(dataLength > Commands.DEFAULT_MAX_MESSAGE_SIZE); byte[] data = new byte[dataLength]; for (int i = 0; i < dataLength; i++) { data[i] = 'a'; } int messageCnt = 5; + log.info("start produce big entry data, data length: {}", dataLength); for (int i = 0 ; i < messageCnt; ++i) { producer.newMessage().value(data).send(); } - int count = selectCount("public/default", tableName);; + int count = selectCount("public/default", tableName); Assert.assertEquals(count, messageCnt); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 3feeed6b76a53..e566df1b84f62 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -423,7 +423,7 @@ private PrestoWorkerContainer buildPrestoWorkerContainer(String hostName, boolea .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) .withEnv("pulsar.web-service-url", "http://pulsar-broker-0:8080") - .withEnv("pulsar.max-message-size", "" + spec.maxMessageSize) + .withEnv("SQL_PREFIX_pulsar.max-message-size", "" + spec.maxMessageSize) .withClasspathResourceMapping( resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE); if (spec.queryLastMessage) { From c711d2d5d665fa2452c0ab98eb2ab7f0c1dc3ced Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Mon, 25 Oct 2021 20:19:15 +0800 Subject: [PATCH 6/6] fix --- .../pulsar/tests/integration/presto/TestBasicPresto.java | 4 ++-- .../pulsar/tests/integration/suites/PulsarSQLTestSuite.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 7f7d5517dd17b..62f59c3f36f58 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -344,9 +344,9 @@ public void testQueueBigEntry() throws Exception { .enableBatching(false) .create(); - int dataLength = pulsarCluster.getSpec().maxMessageSize() - 1024 * 1024; // Make sure that the data length bigger than the default maxMessageSize - Assert.assertTrue(dataLength > Commands.DEFAULT_MAX_MESSAGE_SIZE); + int dataLength = Commands.DEFAULT_MAX_MESSAGE_SIZE + 2 * 1024 * 1024; + Assert.assertTrue(dataLength < pulsarCluster.getSpec().maxMessageSize()); byte[] data = new byte[dataLength]; for (int i = 0; i < dataLength; i++) { data[i] = 'a'; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java index b6ea07807ae1d..762fff751b069 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java @@ -50,7 +50,7 @@ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String c specBuilder.queryLastMessage(true); specBuilder.clusterName("pulsar-sql-test"); specBuilder.numBrokers(1); - specBuilder.maxMessageSize(2 * (Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING)); + specBuilder.maxMessageSize(2 * Commands.DEFAULT_MAX_MESSAGE_SIZE); return super.beforeSetupCluster(clusterName, specBuilder); }