From ce962efa537877d38675cdc01f5d8e55616b8943 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Fri, 27 Nov 2020 17:18:08 +0800 Subject: [PATCH 1/2] support topic-level max message size --- .../admin/impl/PersistentTopicsBase.java | 29 ++++++ .../broker/admin/v2/PersistentTopics.java | 84 ++++++++++++++++ .../pulsar/broker/service/AbstractTopic.java | 15 +++ .../pulsar/broker/service/Producer.java | 15 ++- .../nonpersistent/NonPersistentTopic.java | 5 + .../service/persistent/PersistentTopic.java | 12 +++ .../broker/admin/TopicPoliciesTest.java | 97 +++++++++++++++++-- .../apache/pulsar/client/admin/Topics.java | 51 ++++++++++ .../client/admin/internal/TopicsImpl.java | 76 +++++++++++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 7 ++ .../apache/pulsar/admin/cli/CmdTopics.java | 43 ++++++++ .../apache/pulsar/client/impl/ClientCnx.java | 3 + .../pulsar/client/impl/ProducerImpl.java | 19 ++++ .../common/policies/data/TopicPolicies.java | 5 + 14 files changed, 452 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 7fd98c426cb43..fb2e7a149d542 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2613,6 +2613,35 @@ protected CompletableFuture internalRemovePersistence() { return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); } + protected CompletableFuture internalSetMaxMessageSize(Integer maxMessageSize) { + if (maxMessageSize != null && (maxMessageSize < 0 || maxMessageSize > config().getMaxMessageSize())) { + throw new RestException(Status.PRECONDITION_FAILED + , "topic-level maxMessageSize must be greater than or equal to 0 " + + "and must be smaller than that in the broker-level"); + } + + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + + TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); + topicPolicies.setMaxMessageSize(maxMessageSize); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + } + + protected Optional internalGetMaxMessageSize() { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + return getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize); + } + protected Optional internalGetMaxProducers() { validateAdminAccessForTenant(namespaceName.getTenant()); validatePoliciesReadOnlyAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 9021abd7e6e4c..d26b9eb68011b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1919,6 +1919,90 @@ public void removeMaxConsumers(@Suspended final AsyncResponse asyncResponse, }); } + @GET + @Path("/{tenant}/{namespace}/{topic}/maxMessageSize") + @ApiOperation(value = "Get maxMessageSize config for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + try { + Optional policies = internalGetMaxMessageSize(); + if (policies.isPresent()) { + asyncResponse.resume(policies.get()); + } else { + asyncResponse.resume(Response.noContent().build()); + } + } catch (RestException e) { + asyncResponse.resume(e); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/maxMessageSize") + @ApiOperation(value = "Set maxMessageSize config for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "Invalid value of maxConsumers")}) + public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "The max message size of the topic") int maxMessageSize) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetMaxMessageSize(maxMessageSize).whenComplete((r, ex) -> { + if (ex instanceof RestException) { + log.error("Failed updated persistence policies", ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Failed updated persistence policies", ex); + asyncResponse.resume(new RestException(ex)); + } else { + log.info("[{}] Successfully set max message size: namespace={}, topic={}, maxMessageSiz={}", + clientAppId(), + namespaceName, + topicName.getLocalName(), + maxMessageSize); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/maxMessageSize") + @ApiOperation(value = "Remove maxMessageSize config for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removeMaxMessageSize(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetMaxMessageSize(null).whenComplete((r, ex) -> { + if (ex != null) { + log.error("Failed to remove maxMessageSize", ex); + asyncResponse.resume(new RestException(ex)); + } else { + log.info("[{}] Successfully remove max message size: namespace={}, topic={}", + clientAppId(), + namespaceName, + topicName.getLocalName()); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + @POST @Path("/{tenant}/{namespace}/{topic}/terminate") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index f9761e600c35f..dde9ad6fcc771 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -572,6 +572,21 @@ public TopicPolicies getTopicPolicies(TopicName topicName) { } } + protected boolean isExceedMaximumMessageSize(int size) { + Integer maxMessageSize = null; + TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic)); + if (topicPolicies != null && topicPolicies.isMaxMessageSizeSet()) { + maxMessageSize = topicPolicies.getMaxMessageSize(); + } + if (maxMessageSize != null) { + if (maxMessageSize == 0) { + return false; + } + return size > maxMessageSize; + } + return false; + } + /** * update topic publish dispatcher for this topic. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 605905b09b5b8..870eb9976ff2c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -354,8 +354,7 @@ public long getOriginalHighestSequenceId() { @Override public void completed(Exception exception, long ledgerId, long entryId) { if (exception != null) { - ServerError serverError = (exception instanceof TopicTerminatedException) - ? ServerError.TopicTerminatedError : ServerError.PersistenceError; + final ServerError serverError = getServerError(exception); producer.cnx.execute(() -> { if (!(exception instanceof TopicClosedException)) { @@ -381,6 +380,18 @@ public void completed(Exception exception, long ledgerId, long entryId) { } } + private ServerError getServerError(Exception exception) { + ServerError serverError; + if (exception instanceof TopicTerminatedException) { + serverError = ServerError.TopicTerminatedError; + } else if (exception instanceof BrokerServiceException.NotAllowedException) { + serverError = ServerError.NotAllowedError; + } else { + serverError = ServerError.PersistenceError; + } + return serverError; + } + /** * Executed from I/O thread when sending receipt back to client */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 213888577dca3..b2a065c42a242 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -164,6 +164,11 @@ public NonPersistentTopic(String topic, BrokerService brokerService) { @Override public void publishMessage(ByteBuf data, PublishContext callback) { + if (isExceedMaximumMessageSize(data.readableBytes())) { + callback.completed(new NotAllowedException("Exceed maximum message size") + , -1, -1); + return; + } callback.completed(null, 0L, 0L); ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cd2ba1710e4ae..33b49ac8578a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -340,6 +340,12 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont decrementPendingWriteOpsAndCheck(); return; } + if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) { + publishContext.completed(new NotAllowedException("Exceed maximum message size") + , -1, -1); + decrementPendingWriteOpsAndCheck(); + return; + } MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload); switch (status) { @@ -2455,6 +2461,12 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon decrementPendingWriteOpsAndCheck(); return; } + if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) { + publishContext.completed(new NotAllowedException("Exceed maximum message size") + , -1, -1); + decrementPendingWriteOpsAndCheck(); + return; + } MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload); switch (status) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index e2165d2d575cf..8fe2be20d6006 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -18,32 +18,31 @@ */ package org.apache.pulsar.broker.admin; -import org.apache.pulsar.broker.service.PublishRateLimiterImpl; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; -import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; -import org.apache.pulsar.common.policies.data.SubscribeRate; -import static org.testng.Assert.assertEquals; - import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BacklogQuotaManager; +import org.apache.pulsar.broker.service.PublishRateLimiterImpl; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; import org.awaitility.Awaitility; import org.testng.Assert; @@ -55,6 +54,12 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + @Slf4j public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @@ -91,6 +96,7 @@ protected void setup() throws Exception { @Override public void cleanup() throws Exception { super.internalCleanup(); + this.resetConfig(); } @Test @@ -1160,4 +1166,81 @@ public void testPublishRateInDifferentLevelPolicy() throws Exception { Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5); Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L); } + + @Test(timeOut = 20000) + public void testTopicMaxMessageSizeApi() throws Exception{ + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic))); + assertNull(admin.topics().getMaxMessageSize(persistenceTopic)); + + admin.topics().setMaxMessageSize(persistenceTopic,10); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() + -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic)) != null); + assertEquals(admin.topics().getMaxMessageSize(persistenceTopic).intValue(),10); + + admin.topics().removeMaxMessageSize(persistenceTopic); + assertNull(admin.topics().getMaxMessageSize(persistenceTopic)); + + try { + admin.topics().setMaxMessageSize(persistenceTopic,Integer.MAX_VALUE); + fail("should fail"); + } catch (PulsarAdminException e) { + assertEquals(e.getStatusCode(),412); + } + try { + admin.topics().setMaxMessageSize(persistenceTopic, -1); + fail("should fail"); + } catch (PulsarAdminException e) { + assertEquals(e.getStatusCode(),412); + } + } + + @Test(timeOut = 20000) + public void testTopicMaxMessageSize() throws Exception{ + doTestTopicMaxMessageSize(true); + doTestTopicMaxMessageSize(false); + } + + private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception { + final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); + if (isPartitioned) { + admin.topics().createPartitionedTopic(topic, 3); + } + // init cache + Producer producer = pulsarClient.newProducer().topic(topic).create(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + assertNull(admin.topics().getMaxMessageSize(topic)); + // set msg size + admin.topics().setMaxMessageSize(topic, 10); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() + -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); + assertEquals(admin.topics().getMaxMessageSize(topic).intValue(), 10); + + try { + producer.send(new byte[1024]); + } catch (PulsarClientException e) { + assertTrue(e instanceof PulsarClientException.NotAllowedException); + } + + admin.topics().removeMaxMessageSize(topic); + assertNull(admin.topics().getMaxMessageSize(topic)); + + try { + admin.topics().setMaxMessageSize(topic, Integer.MAX_VALUE); + fail("should fail"); + } catch (PulsarAdminException e) { + assertEquals(e.getStatusCode(), 412); + } + try { + admin.topics().setMaxMessageSize(topic, -1); + fail("should fail"); + } catch (PulsarAdminException e) { + assertEquals(e.getStatusCode(), 412); + } + + MessageId messageId = producer.send(new byte[1024]); + assertNotNull(messageId); + producer.close(); + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 0ae525f5008e3..b52e0fd4f276f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2455,6 +2455,57 @@ void setInactiveTopicPolicies(String topic * @param topic Topic name */ CompletableFuture removeMaxProducersAsync(String topic); + /** + * Get the max message size of producer for specified topic. + * + * @param topic Topic name + * @return Configuration of bookkeeper persistence policies + * @throws PulsarAdminException Unexpected error + */ + Integer getMaxMessageSize(String topic) throws PulsarAdminException; + + /** + * Get the max message size of producer for specified topic asynchronously. + * + * @param topic Topic name + * @return Configuration of bookkeeper persistence policies + * @throws PulsarAdminException Unexpected error + */ + CompletableFuture getMaxMessageSizeAsync(String topic); + + + /** + * Set the max message size of producer for specified topic. + * + * @param topic Topic name + * @param maxMessageSize Max message size of producer + * @throws PulsarAdminException Unexpected error + */ + void setMaxMessageSize(String topic, int maxMessageSize) throws PulsarAdminException; + + /** + * Set the max message size of producer for specified topic asynchronously.0 disables. + * + * @param topic Topic name + * @param maxProducers Max message size of producer + * @throws PulsarAdminException Unexpected error + */ + CompletableFuture setMaxMessageSizeAsync(String topic, int maxProducers); + + /** + * Remove the max message size of producer for specified topic. + * + * @param topic Topic name + * @throws PulsarAdminException Unexpected error + */ + void removeMaxMessageSize(String topic) throws PulsarAdminException; + + /** + * Remove the max message size of producer for specified topic asynchronously. + * + * @param topic Topic name + */ + CompletableFuture removeMaxMessageSizeAsync(String topic); /** * Get the max number of consumer for specified topic. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 8b1f83fe0a6dc..70b29ebd8fe31 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2636,6 +2636,82 @@ public CompletableFuture removeMaxProducersAsync(String topic) { return asyncDeleteRequest(path); } + @Override + public Integer getMaxMessageSize(String topic) throws PulsarAdminException { + try { + return getMaxMessageSizeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getMaxMessageSizeAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "maxMessageSize"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Integer maxProducers) { + future.complete(maxProducers); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setMaxMessageSize(String topic, int maxMessageSize) throws PulsarAdminException { + try { + setMaxMessageSizeAsync(topic, maxMessageSize).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture setMaxMessageSizeAsync(String topic, int maxProducers) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "maxMessageSize"); + return asyncPostRequest(path, Entity.entity(maxProducers, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeMaxMessageSize(String topic) throws PulsarAdminException { + try { + removeMaxMessageSizeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeMaxMessageSizeAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "maxMessageSize"); + return asyncDeleteRequest(path); + } + @Override public Integer getMaxConsumers(String topic) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 5453a8634b44b..927ee73b63a27 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -778,6 +778,13 @@ public void topics() throws Exception { cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99")); verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99); + cmdTopics.run(split("get-max-message-size persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getMaxMessageSize("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("remove-max-message-size persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeMaxMessageSize("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-max-message-size persistent://myprop/clust/ns1/ds1 -m 99")); + verify(mockTopics).setMaxMessageSize("persistent://myprop/clust/ns1/ds1", 99); + cmdTopics.run(split("get-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("remove-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1")); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index c5ed51db23773..4519568ef2e17 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -165,6 +165,10 @@ public CmdTopics(PulsarAdmin admin) { jcommander.addCommand("set-maxProducers", new SetMaxProducers()); jcommander.addCommand("remove-maxProducers", new RemoveMaxProducers()); + jcommander.addCommand("get-max-message-size", new GetMaxMessageSize()); + jcommander.addCommand("set-max-message-size", new SetMaxMessageSize()); + jcommander.addCommand("remove-max-message-size", new RemoveMaxMessageSize()); + jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription()); jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription()); jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription()); @@ -1692,6 +1696,45 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get max message size for a topic") + private class GetMaxMessageSize extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getMaxMessageSize(persistentTopic)); + } + } + + @Parameters(commandDescription = "Set max message size for a topic") + private class SetMaxMessageSize extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--max-message-size", "-m"}, description = "Max message size for a topic", required = true) + private int maxMessageSize; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setMaxMessageSize(persistentTopic, maxMessageSize); + } + } + + @Parameters(commandDescription = "Remove max message size for a topic") + private class RemoveMaxMessageSize extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeMaxMessageSize(persistentTopic); + } + } + @Parameters(commandDescription = "Get max consumers per subscription for a topic") private class GetMaxConsumersPerSubscription extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 44a93b444da91..bbcba68e28d17 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -605,6 +605,9 @@ protected void handleSendError(CommandSendError sendError) { case TopicTerminatedError: producers.get(producerId).terminated(this); break; + case NotAllowedError: + producers.get(producerId).recoverNotAllowedError(sequenceId); + break; default: // By default, for transient error, let the reconnection logic diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 8d09606997dff..3ab60335ac73f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1039,6 +1039,25 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) resendMessages(cnx); } + protected synchronized void recoverNotAllowedError(long sequenceId) { + OpSendMsg op = pendingMessages.peek(); + if(op != null && sequenceId == getHighestSequenceId(op)){ + pendingMessages.remove(); + releaseSemaphoreForSendOp(op); + try { + op.callback.sendComplete( + new PulsarClientException.NotAllowedException( + format("The size of the message which is produced by producer %s to the topic " + + "%s is not allowed", producerName, topic))); + } catch (Throwable t) { + log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, + producerName, sequenceId, t); + } + ReferenceCountUtil.safeRelease(op.cmd); + op.recycle(); + } + } + /** * Computes checksum again and verifies it against existing checksum. If checksum doesn't match it means that * message is corrupt. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index eb5098d414073..69d9fa3bc8ced 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -57,6 +57,11 @@ public class TopicPolicies { private PublishRate publishRate = null; private SubscribeRate subscribeRate = null; private Integer deduplicationSnapshotIntervalSeconds = null; + private Integer maxMessageSize = null; + + public boolean isMaxMessageSizeSet() { + return maxMessageSize != null; + } public boolean isDeduplicationSnapshotIntervalSecondsSet(){ return deduplicationSnapshotIntervalSeconds != null; From 388d5686c4bb72ecef0f73b0f52efb61fea2802e Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Fri, 27 Nov 2020 20:06:05 +0800 Subject: [PATCH 2/2] Modify comments --- .../org/apache/pulsar/client/admin/Topics.java | 16 ++++++++-------- .../pulsar/client/admin/internal/TopicsImpl.java | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index b52e0fd4f276f..4c8a8ffa22126 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2456,7 +2456,7 @@ void setInactiveTopicPolicies(String topic */ CompletableFuture removeMaxProducersAsync(String topic); /** - * Get the max message size of producer for specified topic. + * Get the max message size for specified topic. * * @param topic Topic name * @return Configuration of bookkeeper persistence policies @@ -2465,7 +2465,7 @@ void setInactiveTopicPolicies(String topic Integer getMaxMessageSize(String topic) throws PulsarAdminException; /** - * Get the max message size of producer for specified topic asynchronously. + * Get the max message size for specified topic asynchronously. * * @param topic Topic name * @return Configuration of bookkeeper persistence policies @@ -2475,7 +2475,7 @@ void setInactiveTopicPolicies(String topic /** - * Set the max message size of producer for specified topic. + * Set the max message size for specified topic. * * @param topic Topic name * @param maxMessageSize Max message size of producer @@ -2484,16 +2484,16 @@ void setInactiveTopicPolicies(String topic void setMaxMessageSize(String topic, int maxMessageSize) throws PulsarAdminException; /** - * Set the max message size of producer for specified topic asynchronously.0 disables. + * Set the max message size for specified topic asynchronously.0 disables. * * @param topic Topic name - * @param maxProducers Max message size of producer + * @param maxMessageSize Max message size of topic * @throws PulsarAdminException Unexpected error */ - CompletableFuture setMaxMessageSizeAsync(String topic, int maxProducers); + CompletableFuture setMaxMessageSizeAsync(String topic, int maxMessageSize); /** - * Remove the max message size of producer for specified topic. + * Remove the max message size for specified topic. * * @param topic Topic name * @throws PulsarAdminException Unexpected error @@ -2501,7 +2501,7 @@ void setInactiveTopicPolicies(String topic void removeMaxMessageSize(String topic) throws PulsarAdminException; /** - * Remove the max message size of producer for specified topic asynchronously. + * Remove the max message size for specified topic asynchronously. * * @param topic Topic name */ diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 70b29ebd8fe31..eca4c5f6fa01d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2658,8 +2658,8 @@ public CompletableFuture getMaxMessageSizeAsync(String topic) { asyncGetRequest(path, new InvocationCallback() { @Override - public void completed(Integer maxProducers) { - future.complete(maxProducers); + public void completed(Integer maxMessageSize) { + future.complete(maxMessageSize); } @Override @@ -2685,10 +2685,10 @@ public void setMaxMessageSize(String topic, int maxMessageSize) throws PulsarAdm } @Override - public CompletableFuture setMaxMessageSizeAsync(String topic, int maxProducers) { + public CompletableFuture setMaxMessageSizeAsync(String topic, int maxMessageSize) { TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "maxMessageSize"); - return asyncPostRequest(path, Entity.entity(maxProducers, MediaType.APPLICATION_JSON)); + return asyncPostRequest(path, Entity.entity(maxMessageSize, MediaType.APPLICATION_JSON)); } @Override