From bf470a47f9ccc38502bcc0b316799eb7023cc362 Mon Sep 17 00:00:00 2001 From: Alex Iudice Date: Fri, 24 Jan 2020 11:53:40 -0500 Subject: [PATCH] Overhaul of apoc broker RabbitMQ send method. Adds queue creation/configuration, exchange creation/configuration, exchange caching. No longer requires queues for sending messages. Renames 'properties' to 'amqpProperties'. --- .../broker/RabbitMqConnectionFactory.java | 149 ++++++++++++++++-- 1 file changed, 135 insertions(+), 14 deletions(-) diff --git a/src/main/java/apoc/broker/RabbitMqConnectionFactory.java b/src/main/java/apoc/broker/RabbitMqConnectionFactory.java index e4fe3579..adc1ca75 100644 --- a/src/main/java/apoc/broker/RabbitMqConnectionFactory.java +++ b/src/main/java/apoc/broker/RabbitMqConnectionFactory.java @@ -13,6 +13,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +46,9 @@ public static class RabbitMqConnection implements BrokerConnection private AtomicBoolean connected = new AtomicBoolean( false ); private AtomicBoolean reconnecting = new AtomicBoolean( false ); + // exchange -> List of routingKeys + private Map> bindingsCache = new HashMap<>(); + public RabbitMqConnection( Log log, String connectionName, Map configuration ) { this.log = log; @@ -77,34 +81,117 @@ public Stream send( @Name( "message" ) Map message { log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'exchangeName' in parameters missing" ); } - if ( !configuration.containsKey( "queueName" ) ) - { - log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'queueName' in parameters missing" ); - } if ( !configuration.containsKey( "routingKey" ) ) { log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'routingKey' in parameters missing" ); } String exchangeName = (String) configuration.get( "exchangeName" ); - String queueName = (String) configuration.get( "queueName" ); String routingKey = (String) configuration.get( "routingKey" ); checkConnectionHealth(); // Set up basic properties - Map properties = (Map) configuration.getOrDefault( "properties", Collections.emptyMap() ); + Map properties = (Map) configuration.getOrDefault( "amqpProperties", Collections.emptyMap() ); AMQP.BasicProperties basicProperties = basicPropertiesMapper( properties ); - // Ensure the exchange and queue are declared. - channel.exchangeDeclare( exchangeName, "topic", true ); - channel.queueDeclarePassive( queueName ); + // (1) Queue checking and creation + // NOTE: If a queueName is included then it will always create the binding! + // For optimization remove queueName if RabbitMQ queues/bindings already all setup. + Boolean setBindingForQueue = false; + String queueName = (String) configuration.getOrDefault( "queueName", "" ); + if ( !queueName.isEmpty() ) + { + try + { + channel.queueDeclarePassive( queueName ); + } + catch ( IOException e ) + { + recoverFromChannelError(); + + log.info( "Queue '" + queueName + "' does not exist for RabbitMQ connection '" + connectionName + "'. Creating it now." ); + + // Queue does not exist so create one and setBindingForQueue = true + setBindingForQueue = true; + + // Check for config + Map queueConfiguration = + (Map) configuration.getOrDefault( "queueConfiguration", Collections.emptyMap() ); + + Boolean queueDurable = (Boolean) queueConfiguration.getOrDefault( "durable", true ); + Boolean queueExclusive = (Boolean) queueConfiguration.getOrDefault( "exclusive", false ); + Boolean queueAutoDelete = (Boolean) queueConfiguration.getOrDefault( "autoDelete", false ); + Map queueArguments = + (Map) queueConfiguration.getOrDefault( "arguments", Collections.emptyMap() ); + + // Declare + channel.queueDeclare( queueName, queueDurable, queueExclusive, queueAutoDelete, queueArguments ); + } + + // If we already know the exchange then just create the binding + if ( isKnownBinding( exchangeName, routingKey ) ) + { + channel.queueBind( queueName, exchangeName, routingKey ); + setBindingForQueue = false; + } + else if ( isKnownExchange( exchangeName ) ) + { + bindingsCache.get( exchangeName ).add( routingKey ); + channel.queueBind( queueName, exchangeName, routingKey ); + setBindingForQueue = false; + } + + } + + // (2) Send message if exchange is known. Create exchange if unknown. + if ( isKnownBinding( exchangeName, routingKey ) ) + { + // Send Message + channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) ); + } + else if ( isKnownExchange( exchangeName ) ) + { + bindingsCache.get( exchangeName ).add( routingKey ); + channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) ); + } + else + { + // Check if exchange exists + try + { + channel.exchangeDeclarePassive( exchangeName ); + } + catch ( IOException e ) + { + recoverFromChannelError(); + + log.info( "Exchange '" + exchangeName + "' does not exist for RabbitMQ connection '" + connectionName + "'. Creating it now." ); + + Map channelConfiguration = + (Map) configuration.getOrDefault( "channelConfiguration", Collections.emptyMap() ); + String channelType = (String) channelConfiguration.getOrDefault( "type", "topic" ); + Boolean channelDurable = (Boolean) channelConfiguration.getOrDefault( "durable", true ); + Boolean channelAutoDelete = (Boolean) channelConfiguration.getOrDefault( "autoDelete", false ); + Map channelArguments = + (Map) channelConfiguration.getOrDefault( "arguments", Collections.emptyMap() ); - // Ensure the exchange and queue are bound by the routing key. - channel.queueBind( queueName, exchangeName, routingKey ); + // Declare channel + channel.exchangeDeclare( exchangeName, channelType, channelDurable, channelAutoDelete, channelArguments ); + } + + // Add it to the cache + bindingsCache.put( exchangeName, Arrays.asList( routingKey ) ); + + // Check if queue was set up as well, so we can add the binding now that the exchange/routingKey is known + if ( setBindingForQueue ) + { + channel.queueBind( queueName, exchangeName, routingKey ); + } - // Get the message bytes and send the message bytes. - channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) ); + // finally send the message + channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) ); + } return Stream.of( new BrokerMessage( connectionName, message, configuration ) ); } @@ -177,7 +264,10 @@ public void stop() { try { - channel.close(); + if ( channel.isOpen() ) + { + channel.close(); + } connection.close(); } catch ( Exception e ) @@ -208,6 +298,37 @@ public void checkConnectionHealth() throws Exception } } + private boolean isKnownExchange( String exchange ) + { + return bindingsCache.containsKey( exchange ); + } + + private boolean isKnownBinding( String exchange, String routingKey ) + { + if ( !bindingsCache.containsKey( exchange ) ) + { + return false; + } + return bindingsCache.get( exchange ).contains( routingKey ); + } + + private void recoverFromChannelError( ) + { + try + { + if ( channel.isOpen() ) + { + channel.close(); + } + channel = connection.createChannel(); + } + catch ( Exception e ) + { + log.error( "Failed to recover from channel error. Error: " + e.getMessage() ); + throw new RuntimeException( "Failed to recover from channel error. Error: " + e.getMessage() ); + } + } + public Log getLog() { return log;