Skip to content

Commit

Permalink
Merge pull request #1200 from cshannon/AMQ-9475
Browse files Browse the repository at this point in the history
AMQ-9475 - ConsumerControl commands should not auto create wildcard dests
  • Loading branch information
cshannon authored Apr 11, 2024
2 parents f90c10d + c8f0419 commit 78d9555
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -543,7 +545,11 @@ public Response messagePull(ConnectionContext context, MessagePull pull) throws
return sub.pullMessage(context, pull);
}

protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
protected Destination lookup(ConnectionContext context, ActiveMQDestination destination, boolean createTemporary) throws Exception {
return lookup(context, destination, createTemporary, true);
}

protected Destination lookup(ConnectionContext context, ActiveMQDestination destination, boolean createTemporary, boolean autoCreate) throws Exception {
Destination dest = null;

destinationsLock.readLock().lock();
Expand All @@ -553,7 +559,7 @@ protected Destination lookup(ConnectionContext context, ActiveMQDestination dest
destinationsLock.readLock().unlock();
}

if (dest == null) {
if (autoCreate && dest == null) {
if (isAutoCreateDestinations()) {
// Try to auto create the destination... re-invoke broker
// from the
Expand Down Expand Up @@ -679,8 +685,8 @@ protected void dispose(ConnectionContext context, Destination dest) throws Excep

@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
Subscription sub = subscriptions.get(control.getConsumerId());
if (sub != null && sub instanceof AbstractSubscription) {
final Subscription sub = subscriptions.get(control.getConsumerId());
if (sub instanceof AbstractSubscription) {
((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(control.getDestination());
Expand All @@ -691,7 +697,17 @@ public void processConsumerControl(ConsumerBrokerExchange consumerExchange, Cons
LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}",
control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize());
try {
lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
final ActiveMQDestination controlDest = Objects.requireNonNull(control.getDestination(),
"Destination must not be null in ConsumerControl");
// Don't auto create patterns (wildcard topics) or composite, this matches addConsumer()
final boolean autoCreate = !controlDest.isPattern() && !controlDest.isComposite();

// If autoCreate is false then lookup() will just return null if the destination
// does not exist and we can skip the call to wakeup. This will prevent creating
// wildcard destinations for wildcard consumers but will use them if they exist
Optional.ofNullable(lookup(consumerExchange.getConnectionContext(),
control.getDestination(),false, autoCreate))
.ifPresent(Destination::wakeup);
} catch (Exception e) {
LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: {}", control.getDestination(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;

import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerControl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import jakarta.jms.*;
import java.net.URI;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

/**
* This tests that subscribing to a wildcard and sending a ConsumerControl
* command for that wildcard sub will not auto create the destination
* by mistake.
*/
@RunWith(Parameterized.class)
public class AMQ9475Test {

@Parameters(name = "queue={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { true }, { false } });
}

public AMQ9475Test(boolean queue) {
this.destination1 = queue ? new ActiveMQQueue("a.>") : new ActiveMQTopic("a.>");
this.destination2 = queue ? new ActiveMQQueue("a") : new ActiveMQTopic("a");
}

private BrokerService brokerService;
private String connectionUri;
private final ActiveMQDestination destination1;
private final ActiveMQDestination destination2;

protected ConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(connectionUri);
conFactory.setWatchTopicAdvisories(false);
return conFactory;
}

@Before
public void setUp() throws Exception {
brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
brokerService.addConnector("tcp://0.0.0.0:0");
brokerService.start();
connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
}

// Normal use case to verify wildcard sub is not created
@Test
public void testNormalWildcardSub() throws Exception {
Session session;
try (Connection connection = createConnectionFactory().createConnection()) {
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination1);
sendMessage(session, destination2, "test");
assertNotNull(consumer.receive(1000));

assertNull(brokerService.getBroker().getDestinationMap().get(destination1));
assertNotNull(brokerService.getBroker().getDestinationMap().get(destination2));
}
}

// Test that the wildcard dest is still not auto-created even after sending the
// ConsumerControl object for it
@Test
public void testWildcardConsumerControl() throws Exception {
Session session;
try (ActiveMQConnection connection = (ActiveMQConnection) createConnectionFactory().createConnection()) {
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination1);

ConsumerControl control = new ConsumerControl();
control.setDestination(destination1);
control.setConsumerId(consumer.getConsumerId());
control.setPrefetch(10);
connection.syncSendPacket(control);

sendMessage(session, destination2, "test");
assertNotNull(consumer.receive(1000));

assertNull(brokerService.getBroker().getDestinationMap().get(destination1));
assertNotNull(brokerService.getBroker().getDestinationMap().get(destination2));
}
}

@After
public void tearDown() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
}

private void sendMessage(Session session, Destination destination, String text) throws JMSException {
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(text));
producer.close();
}
}

0 comments on commit 78d9555

Please sign in to comment.