Skip to content

Commit

Permalink
ARTEMIS-4924 Do not allow sending messages directly to store-and-forw…
Browse files Browse the repository at this point in the history
…ard queues
  • Loading branch information
howardgao committed Jul 23, 2024
1 parent bc1bb99 commit f79f1e8
Show file tree
Hide file tree
Showing 9 changed files with 424 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,8 @@ void slowConsumerDetected(String sessionID,
@LogMessage(id = 222109, value = "Timed out waiting for write lock on consumer {} from {}. Check the Thread dump", level = LogMessage.Level.WARN)
void timeoutLockingConsumer(String consumer, String remoteAddress);

@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, props={}", level = LogMessage.Level.WARN)
void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
org.apache.activemq.artemis.api.core.Message messageCopy,
SimpleString idsHeaderName);

@LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public interface Queue extends Bindable,CriticalComponent {

void refDown(MessageReference messageReference);

default boolean checkInvalid(final MessageReference ref) {
return false;
}

/** Remove item with a supplied non-negative {@literal (>= 0) } ID.
* If the idSupplier returns {@literal < 0} the ID is considered a non value (null) and it will be ignored.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {

String SN_PREFIX = "sf.";

SimpleString getName();

String getNodeID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,6 @@ protected Message beforeForward(final Message message, final SimpleString forwar

byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);

if (queueIds == null) {
// Sanity check only
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName);
throw new IllegalStateException("no queueIDs defined");
}

for (SimpleString propName : propNames) {
if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
messageCopy.removeProperty(propName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
Expand Down Expand Up @@ -84,7 +85,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final String SN_PREFIX = "sf.";
/**
* When getting member on node-up and down we have to remove the name from the transport config
* as the setting we build here doesn't need to consider the name, so use the same name on all
Expand Down Expand Up @@ -712,6 +712,19 @@ private synchronized void activate() throws Exception {

serverLocator.start(server.getExecutorFactory().getExecutor());
}
// add security role to disallow send/edit directly to sf queues
if (server.getConfiguration().isSecurityEnabled()) {
Set<Role> roles = new HashSet<>();
String addressMatch = storeAndForwardPrefix + name + "." + server.getConfiguration().getWildcardConfiguration().getSingleWordString();
roles.add(new Role("*", false, true, false, false, false, false, false, true, false, false, true, false));
Pair<String, Set<Role>> securityItem = new Pair<>(addressMatch, roles);
if (server.getSecurityRepository().getMatch(addressMatch) == null) {
server.getSecurityRepository().addMatch(securityItem.getA(), securityItem.getB());
} else {
//don't override user's configuration
logger.debug("broker has security settings for store and forward addresses {}", addressMatch);
}
}

if (managementService != null) {
TypedProperties props = new TypedProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ExecutorFactory;
Expand Down Expand Up @@ -94,13 +95,22 @@ public Queue createQueueWith(final QueueConfiguration config, PagingManager pagi
PageSubscription pageSubscription = getPageSubscription(config, pagingManager, filter);
if (lastValueKey(config) != null) {
queue = new LastValueQueue(config.setLastValueKey(lastValueKey(config)), filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else if (isSnf(config)) {
queue = new StoreAndForwardQueue(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else {
queue = new QueueImpl(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
}
server.getCriticalAnalyzer().add(queue);
return queue;
}

private boolean isSnf(QueueConfiguration config) {
if (config.isInternal()) {
return config.getName().toString().startsWith(server.getInternalNamingPrefix() + ClusterConnection.SN_PREFIX);
}
return false;
}

@Deprecated
@Override
public Queue createQueue(final long persistenceID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,15 @@ public void addTail(final MessageReference ref, final boolean direct) {
if (scheduleIfPossible(ref)) {
return;
}
if (checkInvalid(ref)) {
//send to dlq
try {
sendToDeadLetterAddress(null, ref);
} catch (Exception e) {
logger.trace("Failed to send reference {} to DLA", ref);
}
return;
}
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(ref.getMessage(), "add tail queue {}", this.getName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.core.server.impl;

import java.util.concurrent.ScheduledExecutorService;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;

public class StoreAndForwardQueue extends QueueImpl {

public StoreAndForwardQueue(QueueConfiguration queueConfiguration,
Filter filter,
PagingStore pagingStore,
PageSubscription pageSubscription,
ScheduledExecutorService scheduledExecutor,
PostOffice postOffice,
StorageManager storageManager,
HierarchicalRepository<AddressSettings> addressSettingsRepository,
ArtemisExecutor executor,
ActiveMQServer server,
QueueFactory factory) {
super(queueConfiguration, filter, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
}

@Override
public boolean checkInvalid(final MessageReference ref) {
Message message = ref.getMessage();
SimpleString idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(this.getName());
byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
if (queueIds == null) {
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, idsHeaderName);
return true;
}
return false;
}
}
Loading

0 comments on commit f79f1e8

Please sign in to comment.