diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties index 1f5a89a8b7cc7..e273b98dccc95 100644 --- a/conf/presto/catalog/pulsar.properties +++ b/conf/presto/catalog/pulsar.properties @@ -42,7 +42,8 @@ pulsar.max-split-queue-cache-size=-1 # to prevent erroneous rewriting pulsar.namespace-delimiter-rewrite-enable=false pulsar.rewrite-namespace-delimiter=/ - +# max size of one batch message (default value is 5MB) +# pulsar.max-message-size=5242880 ####### TIERED STORAGE OFFLOADER CONFIGS ####### diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index bf823de7229f9..9a64c055d07ac 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -42,6 +42,7 @@ import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -109,7 +110,8 @@ private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig puls .setReadEntryTimeout(60) .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue()) .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads()) - .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads()); + .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads()) + .setNettyMaxFrameSizeBytes(pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING); ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB()); diff --git a/site2/docs/sql-deployment-configurations.md b/site2/docs/sql-deployment-configurations.md index 1fe0353f07531..e5c402ebdc14d 100644 --- a/site2/docs/sql-deployment-configurations.md +++ b/site2/docs/sql-deployment-configurations.md @@ -24,6 +24,9 @@ pulsar.entry-read-batch-size=100 # default number of splits to use per query pulsar.target-num-splits=4 + +# max size of one batch message (default value is 5MB) +pulsar.max-message-size=5242880 ``` You can connect Presto to a Pulsar cluster with multiple hosts. To configure multiple hosts for brokers, add multiple URLs to `pulsar.web-service-url`. To configure multiple hosts for ZooKeeper, add multiple URIs to `pulsar.zookeeper-uri`. The following is an example. diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java index 36f17cd35cd0b..b294cac4e701e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java @@ -28,5 +28,6 @@ public class BKContainer extends PulsarContainer { public BKContainer(String clusterName, String hostName) { super( clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, INVALID_PORT); + tailContainerLog(); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 91043904076be..62f59c3f36f58 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -27,7 +27,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.AvroSchema; @@ -35,6 +34,7 @@ import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; @@ -56,6 +56,7 @@ public class TestBasicPresto extends TestPulsarSQLBase { private void setupPresto() throws Exception { log.info("[TestBasicPresto] setupPresto..."); pulsarCluster.startPrestoWorker(); + initJdbcConnection(); } private void teardownPresto() { @@ -161,31 +162,26 @@ protected int prepareData(TopicName topicName, boolean useNsOffloadPolices, Schema schema, CompressionType compressionType) throws Exception { - @Cleanup - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) - .build(); if (schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName())) { - prepareDataForBytesSchema(pulsarClient, topicName, isBatch, compressionType); + prepareDataForBytesSchema(topicName, isBatch, compressionType); } else if (schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) { - prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch, compressionType); + prepareDataForByteBufferSchema(topicName, isBatch, compressionType); } else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING)) { - prepareDataForStringSchema(pulsarClient, topicName, isBatch, compressionType); + prepareDataForStringSchema(topicName, isBatch, compressionType); } else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON) || schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) { - prepareDataForStructSchema(pulsarClient, topicName, isBatch, schema, compressionType); + prepareDataForStructSchema(topicName, isBatch, schema, compressionType); } else if (schema.getSchemaInfo().getType().equals(SchemaType.PROTOBUF_NATIVE)) { - prepareDataForProtobufNativeSchema(pulsarClient, topicName, isBatch, schema, compressionType); + prepareDataForProtobufNativeSchema(topicName, isBatch, schema, compressionType); } else if (schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) { - prepareDataForKeyValueSchema(pulsarClient, topicName, schema, compressionType); + prepareDataForKeyValueSchema(topicName, schema, compressionType); } return NUM_OF_STOCKS; } - private void prepareDataForBytesSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForBytesSchema(TopicName topicName, boolean isBatch, CompressionType compressionType) throws PulsarClientException { @Cleanup @@ -201,8 +197,7 @@ private void prepareDataForBytesSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForByteBufferSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForByteBufferSchema(TopicName topicName, boolean isBatch, CompressionType compressionType) throws PulsarClientException { @Cleanup @@ -218,8 +213,7 @@ private void prepareDataForByteBufferSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForStringSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForStringSchema(TopicName topicName, boolean isBatch, CompressionType compressionType) throws PulsarClientException { @Cleanup @@ -235,8 +229,7 @@ private void prepareDataForStringSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForStructSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForStructSchema(TopicName topicName, boolean isBatch, Schema schema, CompressionType compressionType) throws Exception { @@ -254,8 +247,7 @@ private void prepareDataForStructSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForProtobufNativeSchema(TopicName topicName, boolean isBatch, Schema schema, CompressionType compressionType) throws Exception { @@ -274,8 +266,7 @@ private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient, producer.flush(); } - private void prepareDataForKeyValueSchema(PulsarClient pulsarClient, - TopicName topicName, + private void prepareDataForKeyValueSchema(TopicName topicName, Schema> schema, CompressionType compressionType) throws Exception { @Cleanup @@ -342,4 +333,33 @@ private void validateContentForKeyValueSchema(int messageNum, String[] contentAr } } + @Test(timeOut = 1000 * 30) + public void testQueueBigEntry() throws Exception { + String tableName = "big_data_" + randomName(5); + String topic = "persistent://public/default/" + tableName; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .create(); + + // Make sure that the data length bigger than the default maxMessageSize + int dataLength = Commands.DEFAULT_MAX_MESSAGE_SIZE + 2 * 1024 * 1024; + Assert.assertTrue(dataLength < pulsarCluster.getSpec().maxMessageSize()); + byte[] data = new byte[dataLength]; + for (int i = 0; i < dataLength; i++) { + data[i] = 'a'; + } + + int messageCnt = 5; + log.info("start produce big entry data, data length: {}", dataLength); + for (int i = 0 ; i < messageCnt; ++i) { + producer.newMessage().value(data).send(); + } + + int count = selectCount("public/default", tableName); + Assert.assertEquals(count, messageCnt); + } + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java index 881dbe4bd0d41..4c129fadce002 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java @@ -29,7 +29,6 @@ import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -40,8 +39,6 @@ import org.apache.pulsar.tests.integration.containers.S3Container; import org.testcontainers.shaded.org.apache.commons.lang.StringUtils; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -89,6 +86,7 @@ private void setupExtraContainers() throws Exception { String offloadProperties = getOffloadProperties(BUCKET, null, ENDPOINT); pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, offloadProperties); pulsarCluster.startPrestoFollowWorkers(1, OFFLOAD_DRIVER, offloadProperties); + initJdbcConnection(); } private String getOffloadProperties(String bucket, String region, String endpoint) { @@ -136,11 +134,6 @@ protected int prepareData(TopicName topicName, Schema schema, CompressionType compressionType) throws Exception { @Cleanup - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) - .build(); - - @Cleanup Consumer consumer = pulsarClient.newConsumer(JSONSchema.of(Stock.class)) .topic(topicName.toString()) .subscriptionName("test") diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java index 026a32d1c8ec6..0626e3522e8c5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java @@ -21,8 +21,6 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.common.base.Stopwatch; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -196,9 +194,6 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th ); // test predicate pushdown - String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); - Connection connection = DriverManager.getConnection(url, "test", null); - String query = String.format("select * from pulsar" + ".\"%s\".\"%s\" order by __publish_time__", namespace, topic); log.info("Executing query: {}", query); @@ -267,11 +262,7 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size()); assertThat(returnedTimestamps.size()).isEqualTo(0); - query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, topic); - log.info("Executing query: {}", query); - res = connection.createStatement().executeQuery(query); - res.next(); - int count = res.getInt("_col0"); + int count = selectCount(namespace, topic); assertThat(count).isGreaterThan(messageNum - 2); } @@ -304,5 +295,12 @@ private static void printCurrent(ResultSet rs) throws SQLException { } + protected int selectCount(String namespace, String tableName) throws SQLException { + String query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, tableName); + log.info("Executing count query: {}", query); + ResultSet res = connection.createStatement().executeQuery(query); + res.next(); + return res.getInt("_col0"); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java index 9ed733570452f..762fff751b069 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java @@ -18,13 +18,22 @@ */ package org.apache.pulsar.tests.integration.suites; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.tests.integration.containers.BrokerContainer; import org.apache.pulsar.tests.integration.containers.S3Container; import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +/** + * Pulsar SQL test suite. + */ @Slf4j public abstract class PulsarSQLTestSuite extends PulsarTestSuite { @@ -33,11 +42,15 @@ public abstract class PulsarSQLTestSuite extends PulsarTestSuite { public static final String BUCKET = "pulsar-integtest"; public static final String ENDPOINT = "http://" + S3Container.NAME + ":9090"; + protected Connection connection = null; + protected PulsarClient pulsarClient = null; + @Override protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) { specBuilder.queryLastMessage(true); specBuilder.clusterName("pulsar-sql-test"); specBuilder.numBrokers(1); + specBuilder.maxMessageSize(2 * Commands.DEFAULT_MAX_MESSAGE_SIZE); return super.beforeSetupCluster(clusterName, specBuilder); } @@ -55,4 +68,43 @@ protected void beforeStartCluster() throws Exception { } } + @Override + public void setupCluster() throws Exception { + super.setupCluster(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + } + + protected void initJdbcConnection() throws SQLException { + if (pulsarCluster.getPrestoWorkerContainer() == null) { + log.error("The presto work container isn't exist."); + return; + } + String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); + connection = DriverManager.getConnection(url, "test", null); + } + + @Override + public void tearDownCluster() throws Exception { + close(); + super.tearDownCluster(); + } + + protected void close() { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + log.error("Failed to close sql connection.", e); + } + } + if (pulsarClient != null) { + try { + pulsarClient.close(); + } catch (PulsarClientException e) { + log.error("Failed to close pulsar client.", e); + } + } + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 043762ce1f761..e566df1b84f62 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -73,6 +73,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) { return new PulsarCluster(spec); } + @Getter private final PulsarClusterSpec spec; @Getter @@ -150,6 +151,7 @@ private PulsarCluster(PulsarClusterSpec spec) { .withEnv("journalMaxGroupWaitMSec", "0") .withEnv("clusterName", clusterName) .withEnv("diskUsageThreshold", "0.99") + .withEnv("nettyMaxFrameSizeBytes", "" + spec.maxMessageSize) ) ); @@ -166,7 +168,8 @@ private PulsarCluster(PulsarClusterSpec spec) { .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1") // used in s3 tests .withEnv("AWS_ACCESS_KEY_ID", "accesskey") - .withEnv("AWS_SECRET_KEY", "secretkey"); + .withEnv("AWS_SECRET_KEY", "secretkey") + .withEnv("maxMessageSize", "" + spec.maxMessageSize); if (spec.queryLastMessage) { brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10"); brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false"); @@ -420,6 +423,7 @@ private PrestoWorkerContainer buildPrestoWorkerContainer(String hostName, boolea .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) .withEnv("pulsar.web-service-url", "http://pulsar-broker-0:8080") + .withEnv("SQL_PREFIX_pulsar.max-message-size", "" + spec.maxMessageSize) .withClasspathResourceMapping( resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE); if (spec.queryLastMessage) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java index 9dcfcfb725e79..eed604205bdce 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java @@ -29,6 +29,7 @@ import lombok.Singular; import lombok.experimental.Accessors; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.tests.integration.containers.PulsarContainer; import org.testcontainers.containers.GenericContainer; @@ -152,4 +153,7 @@ public class PulsarClusterSpec { * Specify mount files. */ Map brokerMountFiles; + + @Default + int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; }