Skip to content

Commit

Permalink
ARTEMIS-4924 Do not allow sending messages directly to store-and-forw…
Browse files Browse the repository at this point in the history
…ard queues

  - To send any invalid messages in snf queues to DLA
  - Add documentation for store and forward queue proper usage
  • Loading branch information
howardgao committed Jul 26, 2024
1 parent 096a869 commit 1a1f0f5
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ public ActiveMQException createException(String msg) {
public ActiveMQException createException(String msg) {
return new ActiveMQTimeoutException(msg);
}
},
INVALID_MESSAGE_EXCEPTION(224) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQInvalidMessageException(msg);
}
};
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.api.core;

public class ActiveMQInvalidMessageException extends ActiveMQException {

public ActiveMQInvalidMessageException(String message) {
super(ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ public interface Message {
*/
SimpleString HDR_INGRESS_TIMESTAMP = SimpleString.of("_AMQ_INGRESS_TIMESTAMP");

/**
* This gives extra information as to why the messages is sent to DLQ
*/
SimpleString HDR_ROUTE_DLQ_DETAIL = SimpleString.of("_AMQ_DLQ_DETAIL");

/**
* The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore
* the prefix when the message is consumed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,6 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,
@Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}")
ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String bridgeName, String failedOp, BridgeImpl.State currentState);

@Message(id = 229256, value = "Missing header {}")
String messageMissingHeader(SimpleString idsHeaderName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,8 @@ void slowConsumerDetected(String sessionID,
@LogMessage(id = 222109, value = "Timed out waiting for write lock on consumer {} from {}. Check the Thread dump", level = LogMessage.Level.WARN)
void timeoutLockingConsumer(String consumer, String remoteAddress);

@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, props={}", level = LogMessage.Level.WARN)
void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
org.apache.activemq.artemis.api.core.Message messageCopy,
SimpleString idsHeaderName);

@LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ public void failed(Throwable t) {
}

/* Hook for processing message before forwarding */
protected Message beforeForward(Message message, final SimpleString forwardingAddress) {
protected Message beforeForward(Message message, final SimpleString forwardingAddress) throws ActiveMQException {
message = message.copy();
((RefCountMessage)message).setParentRef((RefCountMessage)message);

Expand Down Expand Up @@ -605,7 +605,19 @@ public HandleStatus handle(final MessageReference ref) throws Exception {
dest = ref.getMessage().getAddressSimpleString();
}

final Message message = beforeForward(ref.getMessage(), dest);
final Message message;
try {
message = beforeForward(ref.getMessage(), dest);
} catch (ActiveMQException ex) {
if (ex.getType() == ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION) {
ref.getMessage().putStringProperty(Message.HDR_ROUTE_DLQ_DETAIL, SimpleString.of(ex.getMessage()));
ref.getQueue().sendToDeadLetterAddress(null, ref);
refs.remove(ref.getMessageID());
return HandleStatus.HANDLED;
} else {
throw ex;
}
}

pendingAcks.countUp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
Expand Down Expand Up @@ -185,7 +186,7 @@ protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
}

@Override
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) throws ActiveMQException {
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
Expand All @@ -200,11 +201,9 @@ protected Message beforeForward(final Message message, final SimpleString forwar
Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());

byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);

if (queueIds == null) {
// Sanity check only
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName);
throw new IllegalStateException("no queueIDs defined");
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, idsHeaderName);
throw ActiveMQExceptionType.createException(ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION.getCode(), ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idsHeaderName));
}

for (SimpleString propName : propNames) {
Expand Down
5 changes: 5 additions & 0 deletions docs/user-manual/clusters.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ The default value is `-1`.

It often makes sense to introduce a delay before redistributing as it's a common case that a consumer closes but another one quickly is created on the same queue, in such a case you probably don't want to redistribute immediately since the new consumer will arrive shortly.

[WARNING]
====
The broker uses internal store and forward queues to handle message redistribution. Be aware that any clients should not directly send messages to the sore and forward queues. If a client sends messages to a store and forward queue, the messages will be sent to dead letter address. If security is enabled, make sure the clients do not have `send` permission on any store and forward queues. (The name pattern for a store and forward queue is <internal-naming-prefix>.sf.<cluster-name>.<nodeID> where the default internal-naming-prefix is `$.activemq.internal`, the cluster-name is the name of the cluster-connection, and the nodeID is the target node's ID)
====

== Cluster topologies

Apache ActiveMQ Artemis clusters can be connected together in many different topologies, let's consider the two most common ones here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
Expand Down Expand Up @@ -413,6 +414,106 @@ public void testPauseAddressBlockingSnFQueue() throws Exception {
stopServers(0, 1);
}

@Test
public void testBadClientSendMessagesToSnFQueue() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());

setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);

setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

String dla = "DLA";
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(SimpleString.of(dla));

servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);

startServers(0, 1);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

createQueue(0, dla, dla, null, true);
createQueue(1, dla, dla, null, true);

waitForBindings(0, dla, 1, 0, true);
waitForBindings(1, dla, 1, 0, true);

ClientSession session0 = sfs[0].createSession();
ClientSession session1 = sfs[1].createSession();

session0.start();
session1.start();

final int num = 10;

SimpleString nodeId1 = servers[1].getNodeID();
ClusterConnectionImpl cc0 = (ClusterConnectionImpl) servers[0].getClusterManager().getClusterConnection("cluster0");
SimpleString snfQueue0 = cc0.getSfQueueName(nodeId1.toString());

ClientProducer badProducer0 = session0.createProducer(snfQueue0);
for (int i = 0; i < num; i++) {
Message msg = session0.createMessage(true);
msg.putStringProperty("origin", "from producer 0");
badProducer0.send(msg);
}

//add a remote queue and consumer to enable message to flow from node 0 to node 1
createQueue(1, "queues.testaddress", "queue0", null, true);
ClientConsumer consumer1 = session1.createConsumer("queue0");

waitForBindings(0, "queues.testaddress", 0, 0, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);

waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 0, 0, false);

ClientConsumer dlqConsumer = session0.createConsumer(dla);

for (int i = 0; i < num; i++) {
Message msg = session0.createMessage(true);
msg.putStringProperty("origin", "from producer 0");
badProducer0.send(msg);
}

//messages will never reache the consumer
assertNull(consumer1.receiveImmediate());

SimpleString idHeadersName = Message.HDR_ROUTE_TO_IDS.concat(snfQueue0);
for (int i = 0; i < num * 2; i++) {
ClientMessage m = dlqConsumer.receive(5000);
assertNotNull(m);
String propValue = m.getStringProperty("origin");
assertEquals("from producer 0", propValue);
propValue = m.getStringProperty(Message.HDR_ROUTE_DLQ_DETAIL);
assertEquals(ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idHeadersName), propValue);
m.acknowledge();
}
assertNull(dlqConsumer.receiveImmediate());

//normal message flow should work
ClientProducer goodProducer0 = session0.createProducer("queues.testaddress");
for (int i = 0; i < num; i++) {
Message msg = session0.createMessage(true);
msg.putStringProperty("origin", "from producer 0");
goodProducer0.send(msg);
}

//consumer1 can receive from node0
for (int i = 0; i < num; i++) {
ClientMessage m = consumer1.receive(5000);
assertNotNull(m);
String propValue = m.getStringProperty("origin");
assertEquals("from producer 0", propValue);
m.acknowledge();
}
assertNull(consumer1.receiveImmediate());

stopServers(0, 1);
}

@Override
@AfterEach
public void tearDown() throws Exception {
Expand Down

0 comments on commit 1a1f0f5

Please sign in to comment.