Skip to content

Commit

Permalink
Merge branch 'hotfix-1.1.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
John Simons committed Apr 21, 2014
2 parents d6246ee + 5ca1dfa commit 33b8d0f
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
<Compile Include="RabbitMqContext.cs" />
<Compile Include="TestCategory.cs" />
<Compile Include="When_consuming_messages.cs" />
<Compile Include="When_stopping_endpoint.cs" />
<Compile Include="When_running_concurrent_units_of_work_and_distributed_tx.cs" />
<Compile Include="When_sending_a_message_over_rabbitmq.cs" />
<Compile Include="TransportMessageBuilder.cs" />
Expand Down
64 changes: 42 additions & 22 deletions src/NServiceBus.RabbitMQ.Tests/RabbitMqContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using Config;
using EasyNetQ;
using NUnit.Framework;
Expand All @@ -20,7 +21,7 @@ protected void MakeSureQueueAndExchangeExists(string queueName)
//to make sure we kill old subscriptions
DeleteExchange(queueName);

routingTopology.Initialize(channel,queueName);
routingTopology.Initialize(channel, queueName);
}
}

Expand All @@ -33,14 +34,18 @@ void DeleteExchange(string exchangeName)
{
channel.ExchangeDelete(exchangeName);
}
// ReSharper disable EmptyGeneralCatchClause
// ReSharper disable EmptyGeneralCatchClause
catch (Exception)
// ReSharper restore EmptyGeneralCatchClause
// ReSharper restore EmptyGeneralCatchClause
{
}
}
}

public virtual int MaximumConcurrency
{
get { return 1; }
}

[SetUp]
public void SetUp()
Expand All @@ -50,27 +55,39 @@ public void SetUp()

var config = new ConnectionConfiguration();
config.ParseHosts("localhost:5672");

var selectionStrategy = new DefaultClusterHostSelectionStrategy<ConnectionFactoryInfo>();
var connectionFactory = new ConnectionFactoryWrapper(config, selectionStrategy);
connectionManager = new RabbitMqConnectionManager(connectionFactory, config);

unitOfWork = new RabbitMqUnitOfWork { ConnectionManager = connectionManager,UsePublisherConfirms = true,MaxWaitTimeForConfirms = TimeSpan.FromSeconds(10) };
unitOfWork = new RabbitMqUnitOfWork
{
ConnectionManager = connectionManager,
UsePublisherConfirms = true,
MaxWaitTimeForConfirms = TimeSpan.FromSeconds(10)
};

sender = new RabbitMqMessageSender
{
UnitOfWork = unitOfWork,
RoutingTopology = routingTopology
};

sender = new RabbitMqMessageSender { UnitOfWork = unitOfWork, RoutingTopology = routingTopology };

dequeueStrategy = new RabbitMqDequeueStrategy
{
ConnectionManager = connectionManager,
PurgeOnStartup = true
};

dequeueStrategy = new RabbitMqDequeueStrategy { ConnectionManager = connectionManager, PurgeOnStartup = true };

MakeSureQueueAndExchangeExists(ReceiverQueue);



MessagePublisher = new RabbitMqMessagePublisher
{
UnitOfWork = unitOfWork,
RoutingTopology = routingTopology
};
{
UnitOfWork = unitOfWork,
RoutingTopology = routingTopology
};
subscriptionManager = new RabbitMqSubscriptionManager
{
ConnectionManager = connectionManager,
Expand All @@ -84,20 +101,22 @@ public void SetUp()
return true;
}, (s, exception) => { });

dequeueStrategy.Start(1);
dequeueStrategy.Start(MaximumConcurrency);
}


[TearDown]
public void TearDown()
{
if (dequeueStrategy != null)
{
dequeueStrategy.Stop();

}

connectionManager.Dispose();
}

protected virtual string ExchangeNameConvention(Address address,Type eventType)
protected virtual string ExchangeNameConvention(Address address, Type eventType)
{
return "amq.topic";
}
Expand All @@ -107,24 +126,25 @@ protected TransportMessage WaitForMessage()
{
var waitTime = TimeSpan.FromSeconds(1);

if (System.Diagnostics.Debugger.IsAttached)
if (Debugger.IsAttached)
{
waitTime = TimeSpan.FromMinutes(10);
}

TransportMessage message;
receivedMessages.TryTake(out message, waitTime);

return message;

}

protected const string ReceiverQueue = "testreceiver";
protected RabbitMqMessagePublisher MessagePublisher;
protected RabbitMqConnectionManager connectionManager;
protected RabbitMqDequeueStrategy dequeueStrategy;
BlockingCollection<TransportMessage> receivedMessages;

protected ConventionalRoutingTopology routingTopology;
protected const string ReceiverQueue = "testreceiver";
protected RabbitMqDequeueStrategy dequeueStrategy;
protected RabbitMqConnectionManager connectionManager;
protected RabbitMqMessageSender sender;
protected RabbitMqMessagePublisher MessagePublisher;
protected RabbitMqSubscriptionManager subscriptionManager;
protected RabbitMqUnitOfWork unitOfWork;
}
Expand Down
33 changes: 33 additions & 0 deletions src/NServiceBus.RabbitMQ.Tests/When_stopping_endpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace NServiceBus.Transports.RabbitMQ.Tests
{
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

[TestFixture]
public class When_stopping_endpoint : RabbitMqContext
{
[SetUp]
public new void SetUp()
{
MakeSureQueueAndExchangeExists(ReceiverQueue);
}

[Test, Explicit]
public void Should__gracefully_shutdown()
{
dequeueStrategy.Stop();

var address = Address.Parse(ReceiverQueue);

Parallel.For(0, 2000, i =>
sender.Send(new TransportMessage(), address));

dequeueStrategy.PurgeOnStartup = false;
dequeueStrategy.Start(50);
Thread.Sleep(10);
dequeueStrategy.Stop();
connectionManager.Dispose();
}
}
}
Loading

0 comments on commit 33b8d0f

Please sign in to comment.