Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar SQL] Pulsar SQL support query big entry data #12448

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion conf/presto/catalog/pulsar.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pulsar.max-split-queue-cache-size=-1
# to prevent erroneous rewriting
pulsar.namespace-delimiter-rewrite-enable=false
pulsar.rewrite-namespace-delimiter=/

# max size of one batch message (default value is 5MB)
# pulsar.max-message-size=5242880

####### TIERED STORAGE OFFLOADER CONFIGS #######

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

Expand Down Expand Up @@ -109,7 +110,8 @@ private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig puls
.setReadEntryTimeout(60)
.setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
.setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
.setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
.setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads())
.setNettyMaxFrameSizeBytes(pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);

ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
Expand Down
3 changes: 3 additions & 0 deletions site2/docs/sql-deployment-configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pulsar.entry-read-batch-size=100

# default number of splits to use per query
pulsar.target-num-splits=4

# max size of one batch message (default value is 5MB)
pulsar.max-message-size=5242880
```

You can connect Presto to a Pulsar cluster with multiple hosts. To configure multiple hosts for brokers, add multiple URLs to `pulsar.web-service-url`. To configure multiple hosts for ZooKeeper, add multiple URIs to `pulsar.zookeeper-uri`. The following is an example.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public class BKContainer extends PulsarContainer<BKContainer> {
public BKContainer(String clusterName, String hostName) {
super(
clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, INVALID_PORT);
tailContainerLog();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -56,6 +56,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
private void setupPresto() throws Exception {
log.info("[TestBasicPresto] setupPresto...");
pulsarCluster.startPrestoWorker();
initJdbcConnection();
}

private void teardownPresto() {
Expand Down Expand Up @@ -161,31 +162,26 @@ protected int prepareData(TopicName topicName,
boolean useNsOffloadPolices,
Schema schema,
CompressionType compressionType) throws Exception {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();

if (schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName())) {
prepareDataForBytesSchema(pulsarClient, topicName, isBatch, compressionType);
prepareDataForBytesSchema(topicName, isBatch, compressionType);
} else if (schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) {
prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch, compressionType);
prepareDataForByteBufferSchema(topicName, isBatch, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING)) {
prepareDataForStringSchema(pulsarClient, topicName, isBatch, compressionType);
prepareDataForStringSchema(topicName, isBatch, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON)
|| schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) {
prepareDataForStructSchema(pulsarClient, topicName, isBatch, schema, compressionType);
prepareDataForStructSchema(topicName, isBatch, schema, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.PROTOBUF_NATIVE)) {
prepareDataForProtobufNativeSchema(pulsarClient, topicName, isBatch, schema, compressionType);
prepareDataForProtobufNativeSchema(topicName, isBatch, schema, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
prepareDataForKeyValueSchema(pulsarClient, topicName, schema, compressionType);
prepareDataForKeyValueSchema(topicName, schema, compressionType);
}

return NUM_OF_STOCKS;
}

private void prepareDataForBytesSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForBytesSchema(TopicName topicName,
boolean isBatch,
CompressionType compressionType) throws PulsarClientException {
@Cleanup
Expand All @@ -201,8 +197,7 @@ private void prepareDataForBytesSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForByteBufferSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForByteBufferSchema(TopicName topicName,
boolean isBatch,
CompressionType compressionType) throws PulsarClientException {
@Cleanup
Expand All @@ -218,8 +213,7 @@ private void prepareDataForByteBufferSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForStringSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForStringSchema(TopicName topicName,
boolean isBatch,
CompressionType compressionType) throws PulsarClientException {
@Cleanup
Expand All @@ -235,8 +229,7 @@ private void prepareDataForStringSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForStructSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForStructSchema(TopicName topicName,
boolean isBatch,
Schema<Stock> schema,
CompressionType compressionType) throws Exception {
Expand All @@ -254,8 +247,7 @@ private void prepareDataForStructSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForProtobufNativeSchema(TopicName topicName,
boolean isBatch,
Schema<StockProtoMessage.Stock> schema,
CompressionType compressionType) throws Exception {
Expand All @@ -274,8 +266,7 @@ private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForKeyValueSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForKeyValueSchema(TopicName topicName,
Schema<KeyValue<Stock, Stock>> schema,
CompressionType compressionType) throws Exception {
@Cleanup
Expand Down Expand Up @@ -342,4 +333,33 @@ private void validateContentForKeyValueSchema(int messageNum, String[] contentAr
}
}

@Test(timeOut = 1000 * 30)
public void testQueueBigEntry() throws Exception {
String tableName = "big_data_" + randomName(5);
String topic = "persistent://public/default/" + tableName;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.enableBatching(false)
.create();

// Make sure that the data length bigger than the default maxMessageSize
int dataLength = Commands.DEFAULT_MAX_MESSAGE_SIZE + 2 * 1024 * 1024;
Assert.assertTrue(dataLength < pulsarCluster.getSpec().maxMessageSize());
byte[] data = new byte[dataLength];
for (int i = 0; i < dataLength; i++) {
data[i] = 'a';
}

int messageCnt = 5;
log.info("start produce big entry data, data length: {}", dataLength);
for (int i = 0 ; i < messageCnt; ++i) {
producer.newMessage().value(data).send();
}

int count = selectCount("public/default", tableName);
Assert.assertEquals(count, messageCnt);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand All @@ -40,8 +39,6 @@
import org.apache.pulsar.tests.integration.containers.S3Container;
import org.testcontainers.shaded.org.apache.commons.lang.StringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -89,6 +86,7 @@ private void setupExtraContainers() throws Exception {
String offloadProperties = getOffloadProperties(BUCKET, null, ENDPOINT);
pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, offloadProperties);
pulsarCluster.startPrestoFollowWorkers(1, OFFLOAD_DRIVER, offloadProperties);
initJdbcConnection();
}

private String getOffloadProperties(String bucket, String region, String endpoint) {
Expand Down Expand Up @@ -136,11 +134,6 @@ protected int prepareData(TopicName topicName,
Schema schema,
CompressionType compressionType) throws Exception {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();

@Cleanup
Consumer<Stock> consumer = pulsarClient.newConsumer(JSONSchema.of(Stock.class))
.topic(topicName.toString())
.subscriptionName("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
Expand Down Expand Up @@ -196,9 +194,6 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th
);

// test predicate pushdown
String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl());
Connection connection = DriverManager.getConnection(url, "test", null);

String query = String.format("select * from pulsar" +
".\"%s\".\"%s\" order by __publish_time__", namespace, topic);
log.info("Executing query: {}", query);
Expand Down Expand Up @@ -267,11 +262,7 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th
log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size());
assertThat(returnedTimestamps.size()).isEqualTo(0);

query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, topic);
log.info("Executing query: {}", query);
res = connection.createStatement().executeQuery(query);
res.next();
int count = res.getInt("_col0");
int count = selectCount(namespace, topic);
assertThat(count).isGreaterThan(messageNum - 2);
}

Expand Down Expand Up @@ -304,5 +295,12 @@ private static void printCurrent(ResultSet rs) throws SQLException {

}

protected int selectCount(String namespace, String tableName) throws SQLException {
String query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, tableName);
log.info("Executing count query: {}", query);
ResultSet res = connection.createStatement().executeQuery(query);
res.next();
return res.getInt("_col0");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@
*/
package org.apache.pulsar.tests.integration.suites;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.S3Container;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;

/**
* Pulsar SQL test suite.
*/
@Slf4j
public abstract class PulsarSQLTestSuite extends PulsarTestSuite {

Expand All @@ -33,11 +42,15 @@ public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
public static final String BUCKET = "pulsar-integtest";
public static final String ENDPOINT = "http://" + S3Container.NAME + ":9090";

protected Connection connection = null;
protected PulsarClient pulsarClient = null;

@Override
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
specBuilder.queryLastMessage(true);
specBuilder.clusterName("pulsar-sql-test");
specBuilder.numBrokers(1);
specBuilder.maxMessageSize(2 * Commands.DEFAULT_MAX_MESSAGE_SIZE);
return super.beforeSetupCluster(clusterName, specBuilder);
}

Expand All @@ -55,4 +68,43 @@ protected void beforeStartCluster() throws Exception {
}
}

@Override
public void setupCluster() throws Exception {
super.setupCluster();
pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
}

protected void initJdbcConnection() throws SQLException {
if (pulsarCluster.getPrestoWorkerContainer() == null) {
log.error("The presto work container isn't exist.");
return;
}
String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl());
connection = DriverManager.getConnection(url, "test", null);
}

@Override
public void tearDownCluster() throws Exception {
close();
super.tearDownCluster();
}

protected void close() {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
log.error("Failed to close sql connection.", e);
}
}
if (pulsarClient != null) {
try {
pulsarClient.close();
} catch (PulsarClientException e) {
log.error("Failed to close pulsar client.", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) {
return new PulsarCluster(spec);
}

@Getter
private final PulsarClusterSpec spec;

@Getter
Expand Down Expand Up @@ -150,6 +151,7 @@ private PulsarCluster(PulsarClusterSpec spec) {
.withEnv("journalMaxGroupWaitMSec", "0")
.withEnv("clusterName", clusterName)
.withEnv("diskUsageThreshold", "0.99")
.withEnv("nettyMaxFrameSizeBytes", "" + spec.maxMessageSize)
)
);

Expand All @@ -166,7 +168,8 @@ private PulsarCluster(PulsarClusterSpec spec) {
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
// used in s3 tests
.withEnv("AWS_ACCESS_KEY_ID", "accesskey")
.withEnv("AWS_SECRET_KEY", "secretkey");
.withEnv("AWS_SECRET_KEY", "secretkey")
.withEnv("maxMessageSize", "" + spec.maxMessageSize);
if (spec.queryLastMessage) {
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
Expand Down Expand Up @@ -420,6 +423,7 @@ private PrestoWorkerContainer buildPrestoWorkerContainer(String hostName, boolea
.withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
.withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
.withEnv("pulsar.web-service-url", "http://pulsar-broker-0:8080")
.withEnv("SQL_PREFIX_pulsar.max-message-size", "" + spec.maxMessageSize)
.withClasspathResourceMapping(
resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE);
if (spec.queryLastMessage) {
Expand Down
Loading