Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support topic-level max message size #8732

Merged
merged 2 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,35 @@ protected CompletableFuture<Void> internalRemovePersistence() {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSize) {
if (maxMessageSize != null && (maxMessageSize < 0 || maxMessageSize > config().getMaxMessageSize())) {
throw new RestException(Status.PRECONDITION_FAILED
, "topic-level maxMessageSize must be greater than or equal to 0 " +
"and must be smaller than that in the broker-level");
}

validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxMessageSize(maxMessageSize);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected Optional<Integer> internalGetMaxMessageSize() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
return getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
}

protected Optional<Integer> internalGetMaxProducers() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,90 @@ public void removeMaxConsumers(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
@ApiOperation(value = "Get maxMessageSize config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
Optional<Integer> policies = internalGetMaxMessageSize();
if (policies.isPresent()) {
asyncResponse.resume(policies.get());
} else {
asyncResponse.resume(Response.noContent().build());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
@ApiOperation(value = "Set maxMessageSize config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Invalid value of maxConsumers")})
public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The max message size of the topic") int maxMessageSize) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetMaxMessageSize(maxMessageSize).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully set max message size: namespace={}, topic={}, maxMessageSiz={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
maxMessageSize);
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
@ApiOperation(value = "Remove maxMessageSize config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetMaxMessageSize(null).whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxMessageSize", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove max message size: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}


@POST
@Path("/{tenant}/{namespace}/{topic}/terminate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,21 @@ public TopicPolicies getTopicPolicies(TopicName topicName) {
}
}

protected boolean isExceedMaximumMessageSize(int size) {
Integer maxMessageSize = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isMaxMessageSizeSet()) {
maxMessageSize = topicPolicies.getMaxMessageSize();
}
if (maxMessageSize != null) {
if (maxMessageSize == 0) {
return false;
}
return size > maxMessageSize;
}
return false;
}

/**
* update topic publish dispatcher for this topic.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ public long getOriginalHighestSequenceId() {
@Override
public void completed(Exception exception, long ledgerId, long entryId) {
if (exception != null) {
ServerError serverError = (exception instanceof TopicTerminatedException)
? ServerError.TopicTerminatedError : ServerError.PersistenceError;
final ServerError serverError = getServerError(exception);

producer.cnx.execute(() -> {
if (!(exception instanceof TopicClosedException)) {
Expand All @@ -381,6 +380,18 @@ public void completed(Exception exception, long ledgerId, long entryId) {
}
}

private ServerError getServerError(Exception exception) {
ServerError serverError;
if (exception instanceof TopicTerminatedException) {
serverError = ServerError.TopicTerminatedError;
} else if (exception instanceof BrokerServiceException.NotAllowedException) {
serverError = ServerError.NotAllowedError;
} else {
serverError = ServerError.PersistenceError;
}
return serverError;
}

/**
* Executed from I/O thread when sending receipt back to client
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {

@Override
public void publishMessage(ByteBuf data, PublishContext callback) {
if (isExceedMaximumMessageSize(data.readableBytes())) {
callback.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
return;
}
callback.completed(null, 0L, 0L);
ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
publishContext.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}

MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
switch (status) {
Expand Down Expand Up @@ -2455,6 +2461,12 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
publishContext.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}

MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
switch (status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,31 @@
*/
package org.apache.pulsar.broker.admin;

import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import static org.testng.Assert.assertEquals;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand All @@ -55,6 +54,12 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

@Slf4j
public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {

Expand Down Expand Up @@ -91,6 +96,7 @@ protected void setup() throws Exception {
@Override
public void cleanup() throws Exception {
super.internalCleanup();
this.resetConfig();
}

@Test
Expand Down Expand Up @@ -1160,4 +1166,81 @@ public void testPublishRateInDifferentLevelPolicy() throws Exception {
Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5);
Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L);
}

@Test(timeOut = 20000)
public void testTopicMaxMessageSizeApi() throws Exception{
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic)));
assertNull(admin.topics().getMaxMessageSize(persistenceTopic));

admin.topics().setMaxMessageSize(persistenceTopic,10);
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic)) != null);
assertEquals(admin.topics().getMaxMessageSize(persistenceTopic).intValue(),10);

admin.topics().removeMaxMessageSize(persistenceTopic);
assertNull(admin.topics().getMaxMessageSize(persistenceTopic));

try {
admin.topics().setMaxMessageSize(persistenceTopic,Integer.MAX_VALUE);
fail("should fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(),412);
}
try {
admin.topics().setMaxMessageSize(persistenceTopic, -1);
fail("should fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(),412);
}
}

@Test(timeOut = 20000)
public void testTopicMaxMessageSize() throws Exception{
doTestTopicMaxMessageSize(true);
doTestTopicMaxMessageSize(false);
}

private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
if (isPartitioned) {
admin.topics().createPartitionedTopic(topic, 3);
}
// init cache
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.topics().getMaxMessageSize(topic));
// set msg size
admin.topics().setMaxMessageSize(topic, 10);
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
assertEquals(admin.topics().getMaxMessageSize(topic).intValue(), 10);

try {
producer.send(new byte[1024]);
} catch (PulsarClientException e) {
assertTrue(e instanceof PulsarClientException.NotAllowedException);
}

admin.topics().removeMaxMessageSize(topic);
assertNull(admin.topics().getMaxMessageSize(topic));

try {
admin.topics().setMaxMessageSize(topic, Integer.MAX_VALUE);
fail("should fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
}
try {
admin.topics().setMaxMessageSize(topic, -1);
fail("should fail");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
}

MessageId messageId = producer.send(new byte[1024]);
assertNotNull(messageId);
producer.close();
}
}
Loading