Skip to content

Commit

Permalink
Merge pull request #26 from graphfoundation/story-rabbitImprovements-3.3
Browse files Browse the repository at this point in the history
3.3 - RabbitMQ Improvements
  • Loading branch information
bradnussbaum authored Feb 14, 2020
2 parents 0c8bc4a + bf470a4 commit 4733081
Showing 1 changed file with 135 additions and 14 deletions.
149 changes: 135 additions & 14 deletions src/main/java/apoc/broker/RabbitMqConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -45,6 +46,9 @@ public static class RabbitMqConnection implements BrokerConnection
private AtomicBoolean connected = new AtomicBoolean( false );
private AtomicBoolean reconnecting = new AtomicBoolean( false );

// exchange -> List of routingKeys
private Map<String,List<String>> bindingsCache = new HashMap<>();

public RabbitMqConnection( Log log, String connectionName, Map<String,Object> configuration )
{
this.log = log;
Expand Down Expand Up @@ -77,34 +81,117 @@ public Stream<BrokerMessage> send( @Name( "message" ) Map<String,Object> message
{
log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'exchangeName' in parameters missing" );
}
if ( !configuration.containsKey( "queueName" ) )
{
log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'queueName' in parameters missing" );
}
if ( !configuration.containsKey( "routingKey" ) )
{
log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'routingKey' in parameters missing" );
}

String exchangeName = (String) configuration.get( "exchangeName" );
String queueName = (String) configuration.get( "queueName" );
String routingKey = (String) configuration.get( "routingKey" );

checkConnectionHealth();

// Set up basic properties
Map<String,Object> properties = (Map<String,Object>) configuration.getOrDefault( "properties", Collections.<String,Object>emptyMap() );
Map<String,Object> properties = (Map<String,Object>) configuration.getOrDefault( "amqpProperties", Collections.<String,Object>emptyMap() );
AMQP.BasicProperties basicProperties = basicPropertiesMapper( properties );

// Ensure the exchange and queue are declared.
channel.exchangeDeclare( exchangeName, "topic", true );
channel.queueDeclarePassive( queueName );
// (1) Queue checking and creation
// NOTE: If a queueName is included then it will always create the binding!
// For optimization remove queueName if RabbitMQ queues/bindings already all setup.
Boolean setBindingForQueue = false;
String queueName = (String) configuration.getOrDefault( "queueName", "" );
if ( !queueName.isEmpty() )
{
try
{
channel.queueDeclarePassive( queueName );
}
catch ( IOException e )
{
recoverFromChannelError();

log.info( "Queue '" + queueName + "' does not exist for RabbitMQ connection '" + connectionName + "'. Creating it now." );

// Queue does not exist so create one and setBindingForQueue = true
setBindingForQueue = true;

// Check for config
Map<String,Object> queueConfiguration =
(Map<String,Object>) configuration.getOrDefault( "queueConfiguration", Collections.<String,Object>emptyMap() );

Boolean queueDurable = (Boolean) queueConfiguration.getOrDefault( "durable", true );
Boolean queueExclusive = (Boolean) queueConfiguration.getOrDefault( "exclusive", false );
Boolean queueAutoDelete = (Boolean) queueConfiguration.getOrDefault( "autoDelete", false );
Map<String,Object> queueArguments =
(Map<String,Object>) queueConfiguration.getOrDefault( "arguments", Collections.<String,Object>emptyMap() );

// Declare
channel.queueDeclare( queueName, queueDurable, queueExclusive, queueAutoDelete, queueArguments );
}

// If we already know the exchange then just create the binding
if ( isKnownBinding( exchangeName, routingKey ) )
{
channel.queueBind( queueName, exchangeName, routingKey );
setBindingForQueue = false;
}
else if ( isKnownExchange( exchangeName ) )
{
bindingsCache.get( exchangeName ).add( routingKey );
channel.queueBind( queueName, exchangeName, routingKey );
setBindingForQueue = false;
}

}

// (2) Send message if exchange is known. Create exchange if unknown.
if ( isKnownBinding( exchangeName, routingKey ) )
{
// Send Message
channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) );
}
else if ( isKnownExchange( exchangeName ) )
{
bindingsCache.get( exchangeName ).add( routingKey );
channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) );
}
else
{
// Check if exchange exists
try
{
channel.exchangeDeclarePassive( exchangeName );
}
catch ( IOException e )
{
recoverFromChannelError();

log.info( "Exchange '" + exchangeName + "' does not exist for RabbitMQ connection '" + connectionName + "'. Creating it now." );

Map<String,Object> channelConfiguration =
(Map<String,Object>) configuration.getOrDefault( "channelConfiguration", Collections.<String,Object>emptyMap() );
String channelType = (String) channelConfiguration.getOrDefault( "type", "topic" );
Boolean channelDurable = (Boolean) channelConfiguration.getOrDefault( "durable", true );
Boolean channelAutoDelete = (Boolean) channelConfiguration.getOrDefault( "autoDelete", false );
Map<String,Object> channelArguments =
(Map<String,Object>) channelConfiguration.getOrDefault( "arguments", Collections.<String,Object>emptyMap() );

// Ensure the exchange and queue are bound by the routing key.
channel.queueBind( queueName, exchangeName, routingKey );
// Declare channel
channel.exchangeDeclare( exchangeName, channelType, channelDurable, channelAutoDelete, channelArguments );
}

// Add it to the cache
bindingsCache.put( exchangeName, Arrays.asList( routingKey ) );

// Check if queue was set up as well, so we can add the binding now that the exchange/routingKey is known
if ( setBindingForQueue )
{
channel.queueBind( queueName, exchangeName, routingKey );
}

// Get the message bytes and send the message bytes.
channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) );
// finally send the message
channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) );
}

return Stream.of( new BrokerMessage( connectionName, message, configuration ) );
}
Expand Down Expand Up @@ -177,7 +264,10 @@ public void stop()
{
try
{
channel.close();
if ( channel.isOpen() )
{
channel.close();
}
connection.close();
}
catch ( Exception e )
Expand Down Expand Up @@ -208,6 +298,37 @@ public void checkConnectionHealth() throws Exception
}
}

private boolean isKnownExchange( String exchange )
{
return bindingsCache.containsKey( exchange );
}

private boolean isKnownBinding( String exchange, String routingKey )
{
if ( !bindingsCache.containsKey( exchange ) )
{
return false;
}
return bindingsCache.get( exchange ).contains( routingKey );
}

private void recoverFromChannelError( )
{
try
{
if ( channel.isOpen() )
{
channel.close();
}
channel = connection.createChannel();
}
catch ( Exception e )
{
log.error( "Failed to recover from channel error. Error: " + e.getMessage() );
throw new RuntimeException( "Failed to recover from channel error. Error: " + e.getMessage() );
}
}

public Log getLog()
{
return log;
Expand Down

0 comments on commit 4733081

Please sign in to comment.