diff --git a/Samples/AsyncPagesMVC3/AsyncPagesMVC3/Global.asax.cs b/Samples/AsyncPagesMVC3/AsyncPagesMVC3/Global.asax.cs index 8e248fbd45f..29012808ef5 100644 --- a/Samples/AsyncPagesMVC3/AsyncPagesMVC3/Global.asax.cs +++ b/Samples/AsyncPagesMVC3/AsyncPagesMVC3/Global.asax.cs @@ -36,7 +36,7 @@ protected void Application_Start() Configure.WithWeb() .DefaultBuilder() .ForMvc() - .XmlSerializer() + .JsonSerializer() .Log4Net() .MsmqTransport() .IsTransactional(false) diff --git a/Samples/AsyncPagesMVC3/Server/MessageEndpoint.cs b/Samples/AsyncPagesMVC3/Server/MessageEndpoint.cs index 83bb188149d..440c3b9594b 100644 --- a/Samples/AsyncPagesMVC3/Server/MessageEndpoint.cs +++ b/Samples/AsyncPagesMVC3/Server/MessageEndpoint.cs @@ -2,7 +2,16 @@ namespace Server { - public class MessageEndpoint : IConfigureThisEndpoint, AsA_Server + public class MessageEndpoint : IConfigureThisEndpoint, AsA_Server, IWantCustomInitialization { + /// + /// Perform initialization logic. + /// + public void Init() + { + Configure.With() + .DefaultBuilder() + .JsonSerializer(); + } } } diff --git a/Samples/FullDuplex/MyServer/MyOwnUnitOfWork.cs b/Samples/FullDuplex/MyServer/MyOwnUnitOfWork.cs index d50b8c52f55..db80fd0fa9f 100644 --- a/Samples/FullDuplex/MyServer/MyOwnUnitOfWork.cs +++ b/Samples/FullDuplex/MyServer/MyOwnUnitOfWork.cs @@ -7,6 +7,8 @@ namespace MyServer public class MyOwnUnitOfWork : IManageUnitsOfWork { + public IMySession MySession { get; set; } + public void Begin() { LogMessage("Begin"); @@ -15,7 +17,10 @@ public void Begin() public void End(Exception ex) { if (ex == null) + { LogMessage("Commit"); + MySession.SaveChanges(); + } else LogMessage("Rollback, reason: " + ex); } @@ -26,11 +31,30 @@ void LogMessage(string message) } } + public interface IMySession + { + void SaveChanges(); + } + + public class ExampleSession:IMySession + { + public void SaveChanges() + { + Console.WriteLine(string.Format("ExampleSession({0}) - {1}", GetHashCode(), "Saving changes")); + } + } + public class UoWIntitializer : IWantCustomInitialization { public void Init() { - Configure.Instance.Configurer.ConfigureComponent(DependencyLifecycle.InstancePerUnitOfWork); + Configure.Instance.Configurer. + ConfigureComponent(DependencyLifecycle.InstancePerUnitOfWork); + + //this shows the new lambda feature introduced in NServiceBus 3.2.3 + Configure.Instance.Configurer. + ConfigureComponent(()=> new ExampleSession(),DependencyLifecycle.InstancePerUnitOfWork); + } } } \ No newline at end of file diff --git a/Samples/ScaleOut/Orders.Handler.Worker1/App.config b/Samples/ScaleOut/Orders.Handler.Worker1/App.config new file mode 100644 index 00000000000..de35e357263 --- /dev/null +++ b/Samples/ScaleOut/Orders.Handler.Worker1/App.config @@ -0,0 +1,16 @@ + + + +
+
+
+ + + + + + + + diff --git a/Samples/ScaleOut/Orders.Handler.Worker1/Orders.Handler.Worker1.csproj b/Samples/ScaleOut/Orders.Handler.Worker1/Orders.Handler.Worker1.csproj new file mode 100644 index 00000000000..4b9cd089e9e --- /dev/null +++ b/Samples/ScaleOut/Orders.Handler.Worker1/Orders.Handler.Worker1.csproj @@ -0,0 +1,87 @@ + + + + Debug + AnyCPU + 8.0.30703 + 2.0 + {0217E112-7C44-4B6A-98AD-9031B9BC06DB} + Library + Properties + Orders.Handler.Worker1 + Orders.Handler.Worker1 + v4.0 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\..\..\binaries\log4net.dll + + + ..\..\..\binaries\NServiceBus.dll + + + ..\..\..\binaries\NServiceBus.Core.dll + + + ..\..\..\binaries\NServiceBus.Host.exe + + + + + + + EndpointConfig.cs + + + ProcessOrderCommandHandler.cs + + + + + + + + + {162BEDCA-6C44-43B3-B2C1-83994BFFC094} + Orders.Messages + + + + + + Program + $(ProjectDir)$(OutputPath)NServiceBus.Host.exe + NServiceBus.Production NServiceBus.Worker + AllRules.ruleset + + + Program + $(ProjectDir)$(OutputPath)NServiceBus.Host.exe + NServiceBus.Production NServiceBus.Worker + AllRules.ruleset + + \ No newline at end of file diff --git a/Samples/ScaleOut/Orders.Handler.Worker1/Properties/AssemblyInfo.cs b/Samples/ScaleOut/Orders.Handler.Worker1/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..0e0a6f70559 --- /dev/null +++ b/Samples/ScaleOut/Orders.Handler.Worker1/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Orders.Handler.Worker1")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Orders.Handler.Worker1")] +[assembly: AssemblyCopyright("Copyright © 2012")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("bd9897f9-efab-4cfb-9f9d-20c8aec903af")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Samples/ScaleOut/Orders.Handler.Worker2/Orders.Handler.Worker2.csproj b/Samples/ScaleOut/Orders.Handler.Worker2/Orders.Handler.Worker2.csproj new file mode 100644 index 00000000000..b6889faf2b0 --- /dev/null +++ b/Samples/ScaleOut/Orders.Handler.Worker2/Orders.Handler.Worker2.csproj @@ -0,0 +1,89 @@ + + + + Debug + AnyCPU + 8.0.30703 + 2.0 + {EA6F9889-F2AE-4B87-8F0B-941B8428FC22} + Library + Properties + Orders.Handler.Worker2 + Orders.Handler.Worker2 + v4.0 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\..\..\binaries\log4net.dll + + + ..\..\..\binaries\NServiceBus.dll + + + ..\..\..\binaries\NServiceBus.Core.dll + + + ..\..\..\binaries\NServiceBus.Host.exe + + + + + + + EndpointConfig.cs + + + ProcessOrderCommandHandler.cs + + + + + + App.config + + + + + {162BEDCA-6C44-43B3-B2C1-83994BFFC094} + Orders.Messages + + + + + + Program + $(ProjectDir)$(OutputPath)NServiceBus.Host.exe + NServiceBus.Production NServiceBus.Worker + AllRules.ruleset + + + Program + $(ProjectDir)$(OutputPath)NServiceBus.Host.exe + NServiceBus.Production NServiceBus.Worker + AllRules.ruleset + + \ No newline at end of file diff --git a/Samples/ScaleOut/Orders.Handler.Worker2/Properties/AssemblyInfo.cs b/Samples/ScaleOut/Orders.Handler.Worker2/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..a47d6f3a81b --- /dev/null +++ b/Samples/ScaleOut/Orders.Handler.Worker2/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Orders.Handler.Worker2")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Orders.Handler.Worker2")] +[assembly: AssemblyCopyright("Copyright © 2012")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("08618f51-b46e-4a67-b4aa-5012df2e2147")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Samples/ScaleOut/Orders.Handler/App.config b/Samples/ScaleOut/Orders.Handler/App.config index f9a41e67a51..2c2d5215676 100644 --- a/Samples/ScaleOut/Orders.Handler/App.config +++ b/Samples/ScaleOut/Orders.Handler/App.config @@ -1,42 +1,10 @@  -
- - +
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + diff --git a/Samples/ScaleOut/Orders.Handler/EndpointConfig.cs b/Samples/ScaleOut/Orders.Handler/EndpointConfig.cs index 9eeaa519f30..e2508fc262a 100644 --- a/Samples/ScaleOut/Orders.Handler/EndpointConfig.cs +++ b/Samples/ScaleOut/Orders.Handler/EndpointConfig.cs @@ -1,18 +1,10 @@ using NServiceBus; +using NServiceBus.Config; namespace Orders.Handler { - using NServiceBus.Config; - - public class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher, IWantCustomLogging - { - public void Init() - { - SetLoggingLibrary.Log4Net(log4net.Config.XmlConfigurator.Configure); - - } - } - + class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher { } + class ConfiguringTheDistributorWithTheFluentApi : INeedInitialization { public void Init() diff --git a/Samples/ScaleOut/Orders.Handler/Orders.Handler.csproj b/Samples/ScaleOut/Orders.Handler/Orders.Handler.csproj index a625a138233..17e5ba379f6 100644 --- a/Samples/ScaleOut/Orders.Handler/Orders.Handler.csproj +++ b/Samples/ScaleOut/Orders.Handler/Orders.Handler.csproj @@ -5,7 +5,7 @@ AnyCPU 8.0.30703 2.0 - {BB947AD4-907C-4B0A-826E-24C59B7CFC06} + {4A6833F7-A0BE-49B2-89F5-803BE1A151BC} Library Properties Orders.Handler @@ -37,9 +37,8 @@ ..\..\..\binaries\NServiceBus.dll - + ..\..\..\binaries\NServiceBus.Core.dll - False ..\..\..\binaries\NServiceBus.Host.exe @@ -52,15 +51,15 @@ + + + {162BEDCA-6C44-43B3-B2C1-83994BFFC094} Orders.Messages - - - + \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/Properties/AssemblyInfo.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..65d54f2eb06 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("NServiceBus.TimeoutPersisters.NHibernate.Tests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("NServiceBus.TimeoutPersisters.NHibernate.Tests")] +[assembly: AssemblyCopyright("Copyright © 2012")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("d72f8642-e251-4e69-bdd9-7f7478f017aa")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/When_fetching_timeouts_from_storage.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/When_fetching_timeouts_from_storage.cs new file mode 100644 index 00000000000..810937a6fc1 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/When_fetching_timeouts_from_storage.cs @@ -0,0 +1,61 @@ +namespace NServiceBus.TimeoutPersisters.NHibernate.Tests +{ + using System; + using System.Collections.Generic; + using System.Linq; + using NUnit.Framework; + using Timeout.Core; + + [TestFixture] + public class When_fetching_timeouts_from_storage : InMemoryDBFixture + { + [Test] + public void Should_return_the_complete_list_of_timeouts() + { + const int numberOfTimeoutsToAdd = 10; + + for (var i = 0; i < numberOfTimeoutsToAdd; i++) + { + persister.Add(new TimeoutData + { + Time = DateTime.UtcNow.AddHours(1), + CorrelationId = "boo", + Destination = new Address("timeouts", Environment.MachineName), + SagaId = Guid.NewGuid(), + State = new byte[] { 0, 0, 133 }, + Headers = new Dictionary { { "Bar", "34234" }, { "Foo", "dasdsa" }, { "Super", "dsfsdf" }}, + OwningTimeoutManager = Configure.EndpointName, + }); + } + + Assert.AreEqual(numberOfTimeoutsToAdd, persister.GetAll().Count()); + } + + [Test] + public void Should_return_the_correct_headers() + { + const int numberOfTimeoutsToAdd = 10; + var headers = new Dictionary { { "Bar", "34234" }, { "Foo", "dasdsa" }, { "Super", "dsfsdf" } }; + + for (var i = 0; i < numberOfTimeoutsToAdd; i++) + { + persister.Add(new TimeoutData + { + Time = DateTime.UtcNow.AddHours(1), + CorrelationId = "boo", + Destination = new Address("timeouts", Environment.MachineName), + SagaId = Guid.NewGuid(), + State = new byte[] { 1, 1, 133, 200 }, + Headers = headers, + OwningTimeoutManager = Configure.EndpointName, + }); + } + + var timeouts = persister.GetAll(); + foreach (var timeoutData in timeouts) + { + CollectionAssert.AreEqual(headers, timeoutData.Headers); + } + } + } +} \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/When_removing_timeouts_from_the_storage.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/When_removing_timeouts_from_the_storage.cs new file mode 100644 index 00000000000..3c965841f41 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/When_removing_timeouts_from_the_storage.cs @@ -0,0 +1,57 @@ +namespace NServiceBus.TimeoutPersisters.NHibernate.Tests +{ + using System; + using System.Collections.Generic; + using NUnit.Framework; + using Timeout.Core; + + [TestFixture] + public class When_removing_timeouts_from_the_storage : InMemoryDBFixture + { + [Test] + public void Should_remove_timeouts_by_id() + { + var t1 = new TimeoutData { Time = DateTime.Now.AddYears(1), OwningTimeoutManager = Configure.EndpointName, Headers = new Dictionary { { "Header1", "Value1" } } }; + var t2 = new TimeoutData { Time = DateTime.Now.AddYears(1), OwningTimeoutManager = Configure.EndpointName, Headers = new Dictionary { { "Header1", "Value1" } } }; + + persister.Add(t1); + persister.Add(t2); + + var t = persister.GetAll(); + + foreach (var timeoutData in t) + { + persister.Remove(timeoutData.Id); + } + + using (var session = sessionFactory.OpenSession()) + { + Assert.Null(session.Get(new Guid(t1.Id))); + Assert.Null(session.Get(new Guid(t2.Id))); + } + } + + [Test] + public void Should_remove_timeouts_by_sagaid() + { + var sagaId1 = Guid.NewGuid(); + var sagaId2 = Guid.NewGuid(); + var t1 = new TimeoutData { SagaId = sagaId1, Time = DateTime.Now.AddYears(1), OwningTimeoutManager = Configure.EndpointName, Headers = new Dictionary { { "Header1", "Value1" } } }; + var t2 = new TimeoutData { SagaId = sagaId2, Time = DateTime.Now.AddYears(1), OwningTimeoutManager = Configure.EndpointName, Headers = new Dictionary { { "Header1", "Value1" } } }; + + persister.Add(t1); + persister.Add(t2); + + + persister.ClearTimeoutsFor(sagaId1); + persister.ClearTimeoutsFor(sagaId2); + + + using (var session = sessionFactory.OpenSession()) + { + Assert.Null(session.Get(new Guid(t1.Id))); + Assert.Null(session.Get(new Guid(t2.Id))); + } + } + } +} \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/packages.config b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/packages.config new file mode 100644 index 00000000000..e3b821d3961 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.Tests/packages.config @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.sln b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.sln new file mode 100644 index 00000000000..d41fe643951 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate.sln @@ -0,0 +1,26 @@ + +Microsoft Visual Studio Solution File, Format Version 11.00 +# Visual Studio 2010 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NServiceBus.TimeoutPersisters.NHibernate", "NServiceBus.TimeoutPersisters.NHibernate\NServiceBus.TimeoutPersisters.NHibernate.csproj", "{11F92A75-4CBE-4715-9E65-8EB7EBD64AF7}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NServiceBus.TimeoutPersisters.NHibernate.Tests", "NServiceBus.TimeoutPersisters.NHibernate.Tests\NServiceBus.TimeoutPersisters.NHibernate.Tests.csproj", "{4F6B59C8-917C-4C72-A212-F740E585660F}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {11F92A75-4CBE-4715-9E65-8EB7EBD64AF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {11F92A75-4CBE-4715-9E65-8EB7EBD64AF7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {11F92A75-4CBE-4715-9E65-8EB7EBD64AF7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {11F92A75-4CBE-4715-9E65-8EB7EBD64AF7}.Release|Any CPU.Build.0 = Release|Any CPU + {4F6B59C8-917C-4C72-A212-F740E585660F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4F6B59C8-917C-4C72-A212-F740E585660F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4F6B59C8-917C-4C72-A212-F740E585660F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4F6B59C8-917C-4C72-A212-F740E585660F}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/AddressUserType.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/AddressUserType.cs new file mode 100644 index 00000000000..2b1b419be9d --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/AddressUserType.cs @@ -0,0 +1,93 @@ +namespace NServiceBus.TimeoutPersisters.NHibernate +{ + using System; + using System.Data; + using global::NHibernate; + using global::NHibernate.SqlTypes; + using global::NHibernate.UserTypes; + + class AddressUserType: IUserType + { + private static readonly SqlType[] sqlTypes = new[] { NHibernateUtil.String.SqlType }; + + public bool Equals(object x, object y) + { + if (ReferenceEquals(x, y)) + { + return true; + } + if (x == null || y == null) + { + return false; + } + + return x.Equals(y); + + } + + public int GetHashCode(object x) + { + return x.GetHashCode(); + } + + public object NullSafeGet(IDataReader rs, string[] names, object owner) + { + var obj = NHibernateUtil.String.NullSafeGet(rs, names[0]) as string; + + return obj == null ? null : Address.Parse(obj); + } + + public void NullSafeSet(IDbCommand cmd, object value, int index) + { + if (value == null) + { + ((IDataParameter) cmd.Parameters[index]).Value = DBNull.Value; + } + else + { + var address = (Address) value; + ((IDataParameter) cmd.Parameters[index]).Value = address.ToString(); + } + } + + public object DeepCopy(object value) + { + if (value == null) + { + return null; + } + + return Address.Parse(value.ToString()); + } + + public object Replace(object original, object target, object owner) + { + return original; + } + + public object Assemble(object cached, object owner) + { + return cached; + } + + public object Disassemble(object value) + { + return value; + } + + public SqlType[] SqlTypes + { + get { return sqlTypes; } + } + + public Type ReturnedType + { + get { return typeof (Address); } + } + + public bool IsMutable + { + get { return false; } + } + } +} \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/ConfigureNHibernateTimeoutPersister.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/ConfigureNHibernateTimeoutPersister.cs new file mode 100644 index 00000000000..f1fbab6a8c1 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/ConfigureNHibernateTimeoutPersister.cs @@ -0,0 +1,91 @@ +using System; +using System.Reflection; +using NHibernate.Cfg; +using NHibernate.Cfg.MappingSchema; +using NHibernate.Dialect; +using NHibernate.Mapping.ByCode; +using NHibernate.Tool.hbm2ddl; +using Configuration = NHibernate.Cfg.Configuration; + +namespace NServiceBus +{ + using Config; + using TimeoutPersisters.NHibernate; + + /// + /// Configuration extensions for the NHibernate Timeouts persister + /// + public static class ConfigureNHibernateTimeoutPersister + { + /// + /// Configures the persister with Sqlite as its database and auto generates schema on startup. + /// + /// The configuration object. + /// The configuration object. + public static Configure UseNHibernateTimeoutPersisterWithSQLiteAndAutomaticSchemaGeneration(this Configure config) + { + var configuration = new Configuration() + .DataBaseIntegration(x => + { + x.Dialect(); + x.ConnectionString = string.Format(@"Data Source={0};Version=3;New=True;", ".\\NServiceBus.Timeouts.sqlite"); + }); + + return UseNHibernateTimeoutPersister(config, configuration, true); + } + + /// + /// Configures NHibernate Timeout Persister. + /// Database settings are read from custom config section . + /// + /// The configuration object. + /// The configuration object. + public static Configure UseNHibernateTimeoutPersister(this Configure config) + { + + var configSection = Configure.GetConfigSection(); + + if (configSection == null) + { + throw new InvalidOperationException("No configuration section for DB Timeout Storage found. Please add a TimeoutPersisterConfig section to you configuration file"); + } + + + if (configSection.NHibernateProperties.Count == 0) + { + throw new InvalidOperationException("No NHibernate properties found. Please specify NHibernateProperties in your TimeoutPersisterConfig section"); + } + + return UseNHibernateTimeoutPersister(config, + new Configuration().AddProperties(configSection.NHibernateProperties.ToProperties()), + configSection.UpdateSchema); + } + + /// + /// Configures the storage with the user supplied persistence configuration. + /// Database schema is updated if requested by the user. + /// + /// The configuration object. + /// The object. + /// true to auto update schema<./param> + /// The configuration object + public static Configure UseNHibernateTimeoutPersister(this Configure config, + Configuration configuration, + bool autoUpdateSchema) + { + var mapper = new ModelMapper(); + mapper.AddMappings(Assembly.GetExecutingAssembly().GetExportedTypes()); + HbmMapping faultMappings = mapper.CompileMappingForAllExplicitlyAddedEntities(); + + configuration.AddMapping(faultMappings); + + if (autoUpdateSchema) + new SchemaUpdate(configuration).Execute(false, true); + + config.Configurer.ConfigureComponent(DependencyLifecycle.SingleInstance) + .ConfigureProperty(p => p.SessionFactory, configuration.BuildSessionFactory()); + + return config; + } + } +} \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/NHibernateProperty.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/NHibernateProperty.cs new file mode 100644 index 00000000000..f52c167440a --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/NHibernateProperty.cs @@ -0,0 +1,42 @@ +namespace NServiceBus.TimeoutPersisters.NHibernate.Config +{ + using System.Configuration; + + /// + /// A NHibernate property + /// + public class NHibernateProperty : ConfigurationElement + { + /// + /// The key + /// + [ConfigurationProperty("Key", IsRequired = true, IsKey = true)] + public string Key + { + get + { + return (string)this["Key"]; + } + set + { + this["Key"] = value; + } + } + + /// + /// The value to use + /// + [ConfigurationProperty("Value", IsRequired = true, IsKey = false)] + public string Value + { + get + { + return (string)this["Value"]; + } + set + { + this["Value"] = value; + } + } + } +} \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/NHibernatePropertyCollection.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/NHibernatePropertyCollection.cs new file mode 100644 index 00000000000..f133fdaca7e --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/NHibernatePropertyCollection.cs @@ -0,0 +1,49 @@ +namespace NServiceBus.TimeoutPersisters.NHibernate.Config +{ + using System.Collections.Generic; + using System.Configuration; + + /// + /// Collection of NHibernate properties + /// + public class NHibernatePropertyCollection : ConfigurationElementCollection + { + /// + /// Creates a new empty property + /// + /// + protected override ConfigurationElement CreateNewElement() + { + return new NHibernateProperty(); + } + + /// + /// Returns the key for the given element + /// + /// + /// + protected override object GetElementKey(ConfigurationElement element) + { + return ((NHibernateProperty)element).Key; + } + + /// + /// Converts the collection to a dictionary + /// + /// + public IDictionary ToProperties() + { + var retval = new Dictionary(); + + foreach (var element in this) + { + + retval.Add( + (element as NHibernateProperty).Key, + (element as NHibernateProperty).Value); + } + + return retval; + } + } +} \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/TimeoutEntityMap.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/TimeoutEntityMap.cs new file mode 100644 index 00000000000..f64b6fe0137 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/TimeoutEntityMap.cs @@ -0,0 +1,29 @@ +namespace NServiceBus.TimeoutPersisters.NHibernate.Config +{ + using NHibernate; + using global::NHibernate.Mapping.ByCode; + using global::NHibernate.Mapping.ByCode.Conformist; + + public class TimeoutEntityMap : ClassMapping + { + public TimeoutEntityMap() + { + Id(x => x.Id, m => m.Generator(Generators.Assigned)); + Property(p => p.State); + Property(p => p.CorrelationId, pm => pm.Length(1024)); + Property(p => p.Destination, pm => + { + pm.Type(); + pm.Length(1024); + }); + Property(p => p.SagaId, pm => pm.Index("SagaIdIdx")); + Property(p => p.Time); + Property(p => p.Headers, pm => pm.Length(4000)); + Property(p => p.Endpoint, pm => + { + pm.Index("EndpointIdx"); + pm.Length(1024); + }); + } + } +} \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/TimeoutPersisterConfig.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/TimeoutPersisterConfig.cs new file mode 100644 index 00000000000..5f723a3af65 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Config/TimeoutPersisterConfig.cs @@ -0,0 +1,36 @@ +namespace NServiceBus.Config +{ + using System.Configuration; + using TimeoutPersisters.NHibernate.Config; + + public class TimeoutPersisterConfig : ConfigurationSection + { + [ConfigurationProperty("NHibernateProperties", IsRequired = false)] + public NHibernatePropertyCollection NHibernateProperties + { + get + { + return this["NHibernateProperties"] as NHibernatePropertyCollection; + } + set + { + this["NHibernateProperties"] = value; + } + } + + [ConfigurationProperty("UpdateSchema", IsRequired = false,DefaultValue = true)] + public bool UpdateSchema + { + + get + { + + return (bool)this["UpdateSchema"]; + } + set + { + this["UpdateSchema"] = value; + } + } + } +} \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/NServiceBus.TimeoutPersisters.NHibernate.csproj b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/NServiceBus.TimeoutPersisters.NHibernate.csproj new file mode 100644 index 00000000000..60d1f2e35bd --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/NServiceBus.TimeoutPersisters.NHibernate.csproj @@ -0,0 +1,80 @@ + + + + Debug + AnyCPU + 8.0.30703 + 2.0 + {11F92A75-4CBE-4715-9E65-8EB7EBD64AF7} + Library + Properties + NServiceBus.TimeoutPersisters.NHibernate + NServiceBus.TimeoutPersisters.NHibernate + v4.0 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + bin\Debug\NServiceBus.TimeoutPersisters.NHibernate.XML + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + bin\Release\NServiceBus.TimeoutPersisters.NHibernate.XML + + + + ..\..\..\..\packages\Iesi.Collections.3.2.0.4000\lib\Net35\Iesi.Collections.dll + + + ..\..\..\..\packages\NHibernate.3.3.0.4000\lib\Net35\NHibernate.dll + + + ..\..\..\..\build\output\NServiceBus.dll + + + ..\..\..\..\build\output\NServiceBus.Core.dll + + + + + + + + + + + + + + + + + + + + + + Designer + + + + + + \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Properties/AssemblyInfo.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..60c8f064907 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/Properties/AssemblyInfo.cs @@ -0,0 +1,16 @@ +using System; +using System.Reflection; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("NServiceBus.TimeoutPersisters.NHibernate")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyVersion("4.0.0.0")] +[assembly: AssemblyFileVersion("4.0.0.0")] +[assembly: AssemblyCopyright("Copyright (C) NServiceBus 2010-2012")] +[assembly: AssemblyProduct("NServiceBus")] +[assembly: AssemblyCompany("NServiceBus")] +[assembly: AssemblyConfiguration("release")] +[assembly: AssemblyInformationalVersion("4.0.0-alpha0")] +[assembly: ComVisible(false)] +[assembly: CLSCompliantAttribute(true)] + diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/TimeoutEntity.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/TimeoutEntity.cs new file mode 100644 index 00000000000..257d66f69fd --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/TimeoutEntity.cs @@ -0,0 +1,48 @@ +namespace NServiceBus.TimeoutPersisters.NHibernate +{ + using System; + using System.Collections.Generic; + + public class TimeoutEntity + { + /// + /// Id of this timeout. + /// + public virtual Guid Id { get; set; } + + /// + /// The address of the client who requested the timeout. + /// + public virtual Address Destination { get; set; } + + /// + /// The saga ID. + /// + public virtual Guid SagaId { get; set; } + + /// + /// Additional state. + /// + public virtual byte[] State { get; set; } + + /// + /// The time at which the saga ID expired. + /// + public virtual DateTime Time { get; set; } + + /// + /// We store the correlation id in order to preserve it across timeouts. + /// + public virtual string CorrelationId { get; set; } + + /// + /// Store the headers to preserve them across timeouts. + /// + public virtual string Headers { get; set; } + + /// + /// Timeout endpoint name. + /// + public virtual string Endpoint { get; set; } + } +} diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/TimeoutStorage.cs b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/TimeoutStorage.cs new file mode 100644 index 00000000000..c9fd1f87b5a --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/TimeoutStorage.cs @@ -0,0 +1,117 @@ +namespace NServiceBus.TimeoutPersisters.NHibernate +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Data; + using System.Linq; + using Serializers.Json; + using Timeout.Core; + using global::NHibernate; + + public class TimeoutStorage : IPersistTimeouts + { + public ISessionFactory SessionFactory { get; set; } + + public IEnumerable GetAll() + { + using (var session = SessionFactory.OpenStatelessSession()) + using (var tx = session.BeginTransaction(IsolationLevel.ReadCommitted)) + { + var timeoutEntities = session.QueryOver() + .Where(x => x.Endpoint == Configure.EndpointName) + .List(); + + tx.Commit(); + + return timeoutEntities.Select(te => new TimeoutData + { + CorrelationId = te.CorrelationId, + Destination = te.Destination, + Id = te.Id.ToString(), + SagaId = te.SagaId, + State = te.State, + Time = te.Time, + Headers = ConvertStringToDictionary(te.Headers), + }); + } + } + + public void Add(TimeoutData timeout) + { + var newId = Guid.NewGuid(); + + using (var session = SessionFactory.OpenSession()) + using (var tx = session.BeginTransaction(IsolationLevel.ReadCommitted)) + { + session.Save(new TimeoutEntity + { + Id = newId, + CorrelationId = timeout.CorrelationId, + Destination = timeout.Destination, + SagaId = timeout.SagaId, + State = timeout.State, + Time = timeout.Time, + Headers = ConvertDictionaryToString(timeout.Headers), + Endpoint = timeout.OwningTimeoutManager, + }); + + tx.Commit(); + } + + timeout.Id = newId.ToString(); + } + + public void Remove(string timeoutId) + { + using (var session = SessionFactory.OpenStatelessSession()) + using (var tx = session.BeginTransaction(IsolationLevel.ReadCommitted)) + { + var queryString = string.Format("delete {0} where Id = :timeoutId", + typeof(TimeoutEntity)); + session.CreateQuery(queryString) + .SetParameter("timeoutId", Guid.Parse(timeoutId)) + .ExecuteUpdate(); + + tx.Commit(); + } + } + + public void ClearTimeoutsFor(Guid sagaId) + { + using (var session = SessionFactory.OpenStatelessSession()) + using (var tx = session.BeginTransaction(IsolationLevel.ReadCommitted)) + { + var queryString = string.Format("delete {0} where SagaId = :sagaid", + typeof(TimeoutEntity)); + session.CreateQuery(queryString) + .SetParameter("sagaid", sagaId) + .ExecuteUpdate(); + + tx.Commit(); + } + } + + static Dictionary ConvertStringToDictionary(string data) + { + if (String.IsNullOrEmpty(data)) + { + return new Dictionary(); + } + + return serializer.DeserializeObject>(data); + } + + static string ConvertDictionaryToString(ICollection data) + { + if (data == null || data.Count == 0) + { + return null; + } + + return serializer.SerializeObject(data); + } + + static readonly JsonMessageSerializer serializer = new JsonMessageSerializer(null); + } +} diff --git a/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/packages.config b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/packages.config new file mode 100644 index 00000000000..686a3ab5c34 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/NServiceBus.TimeoutPersisters.NHibernate/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/nhibernate/TimeoutPersister/nuget.config b/src/nhibernate/TimeoutPersister/nuget.config new file mode 100644 index 00000000000..d05bd9c5c20 --- /dev/null +++ b/src/nhibernate/TimeoutPersister/nuget.config @@ -0,0 +1,4 @@ + + + ../../../packages + \ No newline at end of file diff --git a/src/nhibernate/UnitOfWork/NServiceBus.UnitOfWork.NHibernate.Config/NServiceBus.UnitOfWork.NHibernate.Config.csproj b/src/nhibernate/UnitOfWork/NServiceBus.UnitOfWork.NHibernate.Config/NServiceBus.UnitOfWork.NHibernate.Config.csproj index 4e853203d9b..1d3435b627c 100644 --- a/src/nhibernate/UnitOfWork/NServiceBus.UnitOfWork.NHibernate.Config/NServiceBus.UnitOfWork.NHibernate.Config.csproj +++ b/src/nhibernate/UnitOfWork/NServiceBus.UnitOfWork.NHibernate.Config/NServiceBus.UnitOfWork.NHibernate.Config.csproj @@ -30,6 +30,7 @@ TRACE prompt 4 + bin\Release\NServiceBus.UnitOfWork.NHibernate.Config.XML @@ -45,7 +46,6 @@ - diff --git a/src/nhibernate/UnitOfWork/NServiceBus.UnitOfWork.NHibernate/NServiceBus.UnitOfWork.NHibernate.csproj b/src/nhibernate/UnitOfWork/NServiceBus.UnitOfWork.NHibernate/NServiceBus.UnitOfWork.NHibernate.csproj index a5e9491ace9..fb7ce898259 100644 --- a/src/nhibernate/UnitOfWork/NServiceBus.UnitOfWork.NHibernate/NServiceBus.UnitOfWork.NHibernate.csproj +++ b/src/nhibernate/UnitOfWork/NServiceBus.UnitOfWork.NHibernate/NServiceBus.UnitOfWork.NHibernate.csproj @@ -30,6 +30,7 @@ TRACE prompt 4 + bin\Release\NServiceBus.UnitOfWork.NHibernate.XML @@ -56,7 +57,6 @@ - diff --git a/src/timeout/NServiceBus.Timeout.Core/TimeoutData.cs b/src/timeout/NServiceBus.Timeout.Core/TimeoutData.cs index bef944affd0..77d84a70974 100644 --- a/src/timeout/NServiceBus.Timeout.Core/TimeoutData.cs +++ b/src/timeout/NServiceBus.Timeout.Core/TimeoutData.cs @@ -1,37 +1,37 @@ -namespace NServiceBus.Timeout.Core +namespace NServiceBus.Timeout.Core { using System; using System.Collections.Generic; using NServiceBus; - /// - /// Holds timeout information. - /// - public class TimeoutData : EventArgs + /// + /// Holds timeout information. + /// + public class TimeoutData : EventArgs { /// /// Id of this timeout /// - public string Id { get; set; } - - /// - /// The address of the client who requested the timeout. - /// - public Address Destination { get; set; } - - /// - /// The saga ID. - /// - public Guid SagaId { get; set; } - - /// - /// Additional state. - /// - public byte[] State { get; set; } - - /// - /// The time at which the saga ID expired. - /// + public string Id { get; set; } + + /// + /// The address of the client who requested the timeout. + /// + public Address Destination { get; set; } + + /// + /// The saga ID. + /// + public Guid SagaId { get; set; } + + /// + /// Additional state. + /// + public byte[] State { get; set; } + + /// + /// The time at which the saga ID expired. + /// public DateTime Time { get; set; } /// @@ -39,6 +39,11 @@ public class TimeoutData : EventArgs /// public string CorrelationId { get; set; } + /// + /// The timeout manager that owns this particular timeout + /// + public string OwningTimeoutManager { get; set; } + /// /// Store the headers to preserve them across timeouts /// @@ -48,5 +53,5 @@ public override string ToString() { return string.Format("Timeout({0}) - Expires:{1}, SagaId:{2}",Id,Time,SagaId); } - } -} + } +} diff --git a/src/timeout/NServiceBus.Timeout.Core/TimeoutTransportMessageHandler.cs b/src/timeout/NServiceBus.Timeout.Core/TimeoutTransportMessageHandler.cs index a38edec19b4..9dff6651ed2 100644 --- a/src/timeout/NServiceBus.Timeout.Core/TimeoutTransportMessageHandler.cs +++ b/src/timeout/NServiceBus.Timeout.Core/TimeoutTransportMessageHandler.cs @@ -1,14 +1,14 @@ -namespace NServiceBus.Timeout.Core +namespace NServiceBus.Timeout.Core { using System; using Unicast.Transport; - public class TimeoutTransportMessageHandler - { - public IPersistTimeouts Persister { get; set; } - - public IManageTimeouts Manager { get; set; } - + public class TimeoutTransportMessageHandler + { + public IPersistTimeouts Persister { get; set; } + + public IManageTimeouts Manager { get; set; } + public void Handle(TransportMessage message) { var sagaId = Guid.Empty; @@ -37,12 +37,13 @@ public void Handle(TransportMessage message) State = message.Body, Time = message.Headers[Headers.Expire].ToUtcDateTime(), CorrelationId = message.CorrelationId, - Headers = message.Headers + Headers = message.Headers, + OwningTimeoutManager = Configure.EndpointName }; Persister.Add(data); Manager.PushTimeout(data); - } - } + } + } } -} +} diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index da106c00897..549fd70e68c 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -10,7 +10,7 @@ namespace NServiceBus.Timeout.Hosting.Windows.Persistence using Raven.Client.Indexes; using log4net; using Raven.Client.Linq; - + public class RavenTimeoutPersistence : IPersistTimeouts { readonly IDocumentStore store; @@ -19,35 +19,31 @@ public RavenTimeoutPersistence(IDocumentStore store) { this.store = store; - if (store.DatabaseCommands.GetIndex("RavenTimeoutPersistence/TimeoutDataSortedByTime") == null) - { - store.DatabaseCommands.PutIndex("RavenTimeoutPersistence/TimeoutDataSortedByTime", - new IndexDefinitionBuilder - { - Map = docs => from doc in docs - select new { doc.Time }, - SortOptions = + store.DatabaseCommands.PutIndex("RavenTimeoutPersistence/TimeoutDataSortedByTime", + new IndexDefinitionBuilder + { + Map = docs => from doc in docs + select new { doc.Time, OwningTimeoutManager = doc.OwningTimeoutManager ?? "" }, + SortOptions = { {doc => doc.Time, SortOptions.String} }, - Indexes = + Indexes = { {doc => doc.Time, FieldIndexing.Default} }, - Stores = + Stores = { {doc => doc.Time, FieldStorage.No} } - }); - } - if (store.DatabaseCommands.GetIndex("RavenTimeoutPersistence/TimeoutData/BySagaId") == null) - { - store.DatabaseCommands.PutIndex("RavenTimeoutPersistence/TimeoutData/BySagaId", new IndexDefinitionBuilder - { - Map = docs => from doc in docs - select new {doc.SagaId} - }); - } + }, true); + + store.DatabaseCommands.PutIndex("RavenTimeoutPersistence/TimeoutData/BySagaId", new IndexDefinitionBuilder + { + Map = docs => from doc in docs + select new { doc.SagaId } + }, true); + } public IEnumerable GetAll() @@ -67,7 +63,8 @@ public IEnumerable GetAll() // we'll wait for nonstale results up until the point in time that we start fetching // since other timeouts that has arrived in the meantime will have been added to the // cache anyway. If we not do this there is a risk that we'll miss them and breaking their SLA - .Customize(c => c.WaitForNonStaleResultsAsOf(timeFetchWasRequested)) + .Where(t => t.OwningTimeoutManager == "" || t.OwningTimeoutManager == Configure.EndpointName) + .Customize(c => c.WaitForNonStaleResultsAsOf(timeFetchWasRequested)) .Statistics(out stats); do { diff --git a/src/unicast/NServiceBus.Unicast.Monitoring/PerformanceCounterInitializer.cs b/src/unicast/NServiceBus.Unicast.Monitoring/PerformanceCounterInitializer.cs index 54febdb44e6..f6c4ceb3487 100644 --- a/src/unicast/NServiceBus.Unicast.Monitoring/PerformanceCounterInitializer.cs +++ b/src/unicast/NServiceBus.Unicast.Monitoring/PerformanceCounterInitializer.cs @@ -2,7 +2,6 @@ { using System; using System.Diagnostics; - using NServiceBus.Config; /// /// Initializes the peformcecounters if they are enabled @@ -14,6 +13,11 @@ public void Run() if (!Configure.Instance.PerformanceCountersEnabled()) return; + if (!PerformanceCounterCategory.Exists(CategoryName)) + { + return; + } + SetupCriticalTimePerformanceCounter(); SetupSLABreachCounter(); @@ -55,6 +59,7 @@ static void SetupSLABreachCounter() static PerformanceCounter InstantiateCounter(string counterName) { PerformanceCounter counter; + try { counter = new PerformanceCounter(CategoryName, counterName, Configure.EndpointName, false); diff --git a/src/unicast/NServiceBus.Unicast.Monitoring/PerformanceCounterInstaller.cs b/src/unicast/NServiceBus.Unicast.Monitoring/PerformanceCounterInstaller.cs index a606df53e93..bdd0a2f404e 100644 --- a/src/unicast/NServiceBus.Unicast.Monitoring/PerformanceCounterInstaller.cs +++ b/src/unicast/NServiceBus.Unicast.Monitoring/PerformanceCounterInstaller.cs @@ -9,7 +9,7 @@ /// /// Performs installation of the performance counters /// - public class PerformanceCounterInstaller:INeedToInstallInfrastructure + public class PerformanceCounterInstaller : INeedToInstallInfrastructure { public void Install(WindowsIdentity identity) { diff --git a/src/unicast/NServiceBus.Unicast/UnicastBus.cs b/src/unicast/NServiceBus.Unicast/UnicastBus.cs index 0f021065620..a30b8278187 100644 --- a/src/unicast/NServiceBus.Unicast/UnicastBus.cs +++ b/src/unicast/NServiceBus.Unicast/UnicastBus.cs @@ -1116,10 +1116,20 @@ IMessageDispatcherFactory GetDispatcherFactoryFor(Type messageHandlerTypeToInvok /// private static Exception GetInnermostException(Exception e) { + if (e.InnerException == null) + return e; + var result = e; - while (result.InnerException != null) + + do + { + if (!result.Source.ToLower().Equals("mscorlib")) + return result; + result = result.InnerException; + } while (result.InnerException != null); + return result; } @@ -1412,7 +1422,8 @@ public void RegisterMessageType(Type messageType, Address address) messageTypeToDestinationLookup[messageType] = address; messageTypeToDestinationLocker.ExitWriteLock(); - Log.Debug("Message " + messageType.FullName + " has been allocated to endpoint " + address + "."); + if(!string.IsNullOrWhiteSpace(address.Machine)) + Log.Debug("Message " + messageType.FullName + " has been allocated to endpoint " + address + "."); if (messageType.GetCustomAttributes(typeof(ExpressAttribute), true).Length == 0) recoverableMessageTypes.Add(messageType); diff --git a/src/utils/MsmqUtilities.cs b/src/utils/MsmqUtilities.cs index 77563dbaa11..4cf29a3b795 100644 --- a/src/utils/MsmqUtilities.cs +++ b/src/utils/MsmqUtilities.cs @@ -1,28 +1,28 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Messaging; -using System.Net; -using System.Net.NetworkInformation; -using System.Security.Principal; -using System.Xml.Serialization; -using Common.Logging; -using NServiceBus.Unicast.Transport; - -namespace NServiceBus.Utils -{ - /// - /// MSMQ-related utility functions - /// - public class MsmqUtilities - { - private static readonly ILog Logger = LogManager.GetLogger(typeof(MsmqUtilities)); - private static readonly string LocalAdministratorsGroupName = new SecurityIdentifier(WellKnownSidType.BuiltinAdministratorsSid, null).Translate(typeof(NTAccount)).ToString(); - private static readonly string LocalEveryoneGroupName = new SecurityIdentifier(WellKnownSidType.WorldSid, null).Translate(typeof(NTAccount)).ToString(); +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Messaging; +using System.Net; +using System.Net.NetworkInformation; +using System.Security.Principal; +using System.Xml.Serialization; +using Common.Logging; +using NServiceBus.Unicast.Transport; + +namespace NServiceBus.Utils +{ + /// + /// MSMQ-related utility functions + /// + public class MsmqUtilities + { + private static readonly ILog Logger = LogManager.GetLogger(typeof(MsmqUtilities)); + private static readonly string LocalAdministratorsGroupName = new SecurityIdentifier(WellKnownSidType.BuiltinAdministratorsSid, null).Translate(typeof(NTAccount)).ToString(); + private static readonly string LocalEveryoneGroupName = new SecurityIdentifier(WellKnownSidType.WorldSid, null).Translate(typeof(NTAccount)).ToString(); private static readonly string LocalAnonymousLogonName = new SecurityIdentifier(WellKnownSidType.AnonymousSid, null).Translate(typeof(NTAccount)).ToString(); - private static string accountToBeAssignedQueuePermissions; - + private static string accountToBeAssignedQueuePermissions; + /// /// Sets the account to be assigned queue permissions. /// @@ -30,439 +30,452 @@ public class MsmqUtilities public static void AccountToBeAssignedQueuePermissions(string account) { accountToBeAssignedQueuePermissions = account; - } - - /// - /// Utility method for creating a queue if it does not exist. - /// - /// - ///The account to be given permissions to the queue - public static void CreateQueueIfNecessary(Address address, string account) - { - if (address == null) - return; - - var q = GetFullPathWithoutPrefix(address); - - if (address.Machine != Environment.MachineName.ToLower()) - { - Logger.Debug("Queue is on remote machine."); - Logger.Debug("If this does not succeed (like if the remote machine is disconnected), processing will continue."); - } - - Logger.Debug(string.Format("Checking if queue exists: {0}.", address)); - - try - { - if (MessageQueue.Exists(q)) - { + } + + /// + /// Utility method for creating a queue if it does not exist. + /// + /// + ///The account to be given permissions to the queue + public static void CreateQueueIfNecessary(Address address, string account) + { + if (address == null) + return; + + var q = GetFullPathWithoutPrefix(address); + + if (address.Machine != Environment.MachineName.ToLower()) + { + Logger.Debug("Queue is on remote machine."); + Logger.Debug("If this does not succeed (like if the remote machine is disconnected), processing will continue."); + } + + Logger.Debug(string.Format("Checking if queue exists: {0}.", address)); + + try + { + if (MessageQueue.Exists(q)) + { Logger.Debug("Queue exists, going to set permissions."); - SetPermissionsForQueue(q, accountToBeAssignedQueuePermissions ?? account); - return; - } - - Logger.Warn("Queue " + q + " does not exist."); - Logger.Debug("Going to create queue: " + q); - - CreateQueue(q, account); - } - catch (Exception ex) - { - Logger.Error(string.Format("Could not create queue {0} or check its existence. Processing will still continue.", address), ex); - } - } - - /// - /// Create named message queue - /// - /// - ///The account to be given permissions to the queue - public static void CreateQueue(string queueName, string account) - { + SetPermissionsForQueue(q, accountToBeAssignedQueuePermissions ?? account); + return; + } + + Logger.Warn("Queue " + q + " does not exist."); + Logger.Debug("Going to create queue: " + q); + + CreateQueue(q, account); + } + catch (Exception ex) + { + Logger.Error(string.Format("Could not create queue {0} or check its existence. Processing will still continue.", address), ex); + } + } + + /// + /// Create named message queue + /// + /// + ///The account to be given permissions to the queue + public static void CreateQueue(string queueName, string account) + { MessageQueue.Create(queueName, true); - SetPermissionsForQueue(queueName, accountToBeAssignedQueuePermissions ?? account); - - Logger.Debug("Queue created: " + queueName); - } - - /// - /// Sets default permissions for queue. - /// - /// - /// - public static void SetPermissionsForQueue(string queue, string account) - { - var q = new MessageQueue(queue); - - q.SetPermissions(LocalAdministratorsGroupName, MessageQueueAccessRights.FullControl, AccessControlEntryType.Allow); - q.SetPermissions(LocalEveryoneGroupName, MessageQueueAccessRights.WriteMessage, AccessControlEntryType.Allow); - q.SetPermissions(LocalAnonymousLogonName, MessageQueueAccessRights.WriteMessage, AccessControlEntryType.Allow); - - q.SetPermissions(account, MessageQueueAccessRights.WriteMessage, AccessControlEntryType.Allow); - q.SetPermissions(account, MessageQueueAccessRights.ReceiveMessage, AccessControlEntryType.Allow); - q.SetPermissions(account, MessageQueueAccessRights.PeekMessage, AccessControlEntryType.Allow); - } - - /// - /// Turns a '@' separated value into a full path. - /// Format is 'queue@machine', or 'queue@ipaddress' - /// - /// - /// - [Obsolete("Use the overload which accepts the Address parameter instead.", true)] - public static string GetFullPath(string value) - { - return GetFullPath(Address.Parse(value)); - } - - private static string getFullPath(string value) - { - return GetFullPath(Address.Parse(value)); - } - - /// - /// Turns a '@' separated value into a full path. - /// Format is 'queue@machine', or 'queue@ipaddress' - /// - /// - /// - public static string GetFullPath(Address value) - { - IPAddress ipAddress; - if (IPAddress.TryParse(value.Machine, out ipAddress)) - return PREFIX_TCP + GetFullPathWithoutPrefix(value); - - return PREFIX + GetFullPathWithoutPrefix(value); - } - - /// - /// Gets the name of the return address from the provided value. - /// If the target includes a machine name, uses the local machine name in the returned value - /// otherwise uses the local IP address in the returned value. - /// - /// - /// - /// - public static string GetReturnAddress(string value, string target) - { - return GetReturnAddress(Address.Parse(value), Address.Parse(target)); - } - - /// - /// Gets the name of the return address from the provided value. - /// If the target includes a machine name, uses the local machine name in the returned value - /// otherwise uses the local IP address in the returned value. - /// - /// - /// - /// - public static string GetReturnAddress(Address value, Address target) - { - var machine = target.Machine; - - IPAddress ipAddress; - - //see if the target is an IP address, if so, get our own local ip address - if (IPAddress.TryParse(machine, out ipAddress)) - { - string myIp = null; - - var networkInterfaces = System.Net.NetworkInformation.NetworkInterface.GetAllNetworkInterfaces(); - foreach(var ni in networkInterfaces) - if (ni.OperationalStatus == OperationalStatus.Up && ni.NetworkInterfaceType != NetworkInterfaceType.Loopback) - { - var ipProps = ni.GetIPProperties(); - if (ipProps.UnicastAddresses.Count > 0) - { - myIp = ipProps.UnicastAddresses[1].Address.ToString(); - break; - } - } - - if (myIp == null) - myIp = "127.0.0.1"; - - return PREFIX_TCP + myIp + PRIVATE + value.Queue; - } - - return PREFIX + GetFullPathWithoutPrefix(value); - } - - /// - /// Returns the full path without Format or direct os - /// from a '@' separated path. - /// - /// - /// - public static string GetFullPathWithoutPrefix(string value) - { - return getMachineNameFromLogicalName(value) + PRIVATE + getQueueNameFromLogicalName(value); - } - - /// - /// Returns the full path without Format or direct os - /// from an address. - /// - /// - /// - public static string GetFullPathWithoutPrefix(Address address) - { - return address.Machine + PRIVATE + address.Queue; - } - - /// - /// Returns the machine name from a '@' separated full logical name, - /// or the Environment.MachineName otherwise. - /// - /// - /// - [Obsolete("Use Address.Machine instead.", true)] - public static string GetMachineNameFromLogicalName(string logicalName) - { - return getMachineNameFromLogicalName(logicalName); - } - - private static string getMachineNameFromLogicalName(string logicalName) - { - string[] arr = logicalName.Split('@'); - - string machine = Environment.MachineName; - - if (arr.Length >= 2) - if (arr[1] != "." && arr[1].ToLower() != "localhost") - machine = arr[1]; - - return machine; - } - - /// - /// Returns the queue name from a '@' separated full logical name. - /// - /// - /// - [Obsolete("Use Address.Queue instead.", true)] - public static string GetQueueNameFromLogicalName(string logicalName) - { - return getQueueNameFromLogicalName(logicalName); - } - - private static string getQueueNameFromLogicalName(string logicalName) - { - string[] arr = logicalName.Split('@'); - - if (arr.Length >= 1) - return arr[0]; - - return null; - } - - /// - /// Checks whether or not a queue is local by its path. - /// - /// The path to the queue to check. - /// true if the queue is local, otherwise false. - public static bool QueueIsLocal(string value) - { - var machineName = Environment.MachineName.ToLower(); - - value = value.ToLower().Replace(PREFIX.ToLower(), ""); - var index = value.IndexOf('\\'); - - var queueMachineName = value.Substring(0, index).ToLower(); - - return (machineName == queueMachineName || queueMachineName == "localhost" || queueMachineName == "."); - } - - /// - /// Gets an independent address for the queue in the form: - /// queue@machine. - /// - /// - /// - public static Address GetIndependentAddressForQueue(MessageQueue q) - { - if (q == null) - return null; - - string[] arr = q.FormatName.Split('\\'); - string queueName = arr[arr.Length - 1]; - - int directPrefixIndex = arr[0].IndexOf(DIRECTPREFIX); - if (directPrefixIndex >= 0) - return new Address(queueName, arr[0].Substring(directPrefixIndex + DIRECTPREFIX.Length)); - - int tcpPrefixIndex = arr[0].IndexOf(DIRECTPREFIX_TCP); - if (tcpPrefixIndex >= 0) - return new Address(queueName, arr[0].Substring(tcpPrefixIndex + DIRECTPREFIX_TCP.Length)); - - try - { - // the pessimistic approach failed, try the optimistic approach - arr = q.QueueName.Split('\\'); - queueName = arr[arr.Length - 1]; - return new Address(queueName, q.MachineName); - } - catch - { - throw new Exception("Could not translate format name to independent name: " + q.FormatName); - } - } - - /// - /// Returns the number of messages in the queue. - /// - /// - public static int GetNumberOfPendingMessages(string queueName) - { - var q = new MessageQueue(getFullPath(queueName)); - - var qMgmt = new MSMQ.MSMQManagementClass(); - object machine = Environment.MachineName; - var missing = Type.Missing; - object formatName = q.FormatName; - - qMgmt.Init(ref machine, ref missing, ref formatName); - return qMgmt.MessageCount; - } - - /// - /// Converts an MSMQ message to a TransportMessage. - /// - /// - /// - public static TransportMessage Convert(Message m) - { - var result = new TransportMessage - { - Id = m.Id, - CorrelationId = - (m.CorrelationId == "00000000-0000-0000-0000-000000000000\\0" - ? null - : m.CorrelationId), - Recoverable = m.Recoverable, - TimeToBeReceived = m.TimeToBeReceived, - TimeSent = m.SentTime, - ReplyToAddress = GetIndependentAddressForQueue(m.ResponseQueue), - MessageIntent = Enum.IsDefined(typeof(MessageIntentEnum), m.AppSpecific) ? (MessageIntentEnum)m.AppSpecific : MessageIntentEnum.Send - }; - - m.BodyStream.Position = 0; - result.Body = new byte[m.BodyStream.Length]; - m.BodyStream.Read(result.Body, 0, result.Body.Length); - - result.Headers = new Dictionary(); - if (m.Extension.Length > 0) - { - var stream = new MemoryStream(m.Extension); - var o = headerSerializer.Deserialize(stream); - - foreach (var pair in o as List) - if (pair.Key != null) - result.Headers.Add(pair.Key, pair.Value); - } - - result.Id = result.GetOriginalId(); - if(result.Headers.ContainsKey("EnclosedMessageTypes")) // This is a V2.6 message - ExtractMsmqMessageLabelInformationForBackwardCompatibility(m, result); - result.IdForCorrelation = result.GetIdForCorrelation(); - - return result; - } - - /// - /// For backward compatibility, extract the V2.6 MSMQ label content (IdForCorrelation and WindowsIdentityName) - /// into the V3.X transport message. - /// - /// Received MSMQ message - /// Transport message to be filled from MSMQ message label - private static void ExtractMsmqMessageLabelInformationForBackwardCompatibility(Message msmqMsg, TransportMessage result) - { - if(string.IsNullOrWhiteSpace(msmqMsg.Label)) - return; - - if (msmqMsg.Label.Contains(TransportHeaderKeys.IdForCorrelation)) - { - int idStartIndex = msmqMsg.Label.IndexOf(string.Format("<{0}>", TransportHeaderKeys.IdForCorrelation)) + TransportHeaderKeys.IdForCorrelation.Length + 2; - int idCount = msmqMsg.Label.IndexOf(string.Format("", TransportHeaderKeys.IdForCorrelation)) - idStartIndex; - - result.IdForCorrelation = msmqMsg.Label.Substring(idStartIndex, idCount); + SetPermissionsForQueue(queueName, accountToBeAssignedQueuePermissions ?? account); + + Logger.Debug("Queue created: " + queueName); + } + + /// + /// Sets default permissions for queue. + /// + /// + /// + public static void SetPermissionsForQueue(string queue, string account) + { + var q = new MessageQueue(queue); + + q.SetPermissions(LocalAdministratorsGroupName, MessageQueueAccessRights.FullControl, AccessControlEntryType.Allow); + q.SetPermissions(LocalEveryoneGroupName, MessageQueueAccessRights.WriteMessage, AccessControlEntryType.Allow); + q.SetPermissions(LocalAnonymousLogonName, MessageQueueAccessRights.WriteMessage, AccessControlEntryType.Allow); + + q.SetPermissions(account, MessageQueueAccessRights.WriteMessage, AccessControlEntryType.Allow); + q.SetPermissions(account, MessageQueueAccessRights.ReceiveMessage, AccessControlEntryType.Allow); + q.SetPermissions(account, MessageQueueAccessRights.PeekMessage, AccessControlEntryType.Allow); + } + + /// + /// Turns a '@' separated value into a full path. + /// Format is 'queue@machine', or 'queue@ipaddress' + /// + /// + /// + [Obsolete("Use the overload which accepts the Address parameter instead.", true)] + public static string GetFullPath(string value) + { + return GetFullPath(Address.Parse(value)); + } + + private static string getFullPath(string value) + { + return GetFullPath(Address.Parse(value)); + } + + /// + /// Turns a '@' separated value into a full path. + /// Format is 'queue@machine', or 'queue@ipaddress' + /// + /// + /// + public static string GetFullPath(Address value) + { + IPAddress ipAddress; + if (IPAddress.TryParse(value.Machine, out ipAddress)) + return PREFIX_TCP + GetFullPathWithoutPrefix(value); + + return PREFIX + GetFullPathWithoutPrefix(value); + } + + /// + /// Gets the name of the return address from the provided value. + /// If the target includes a machine name, uses the local machine name in the returned value + /// otherwise uses the local IP address in the returned value. + /// + /// + /// + /// + public static string GetReturnAddress(string value, string target) + { + return GetReturnAddress(Address.Parse(value), Address.Parse(target)); + } + + /// + /// Gets the name of the return address from the provided value. + /// If the target includes a machine name, uses the local machine name in the returned value + /// otherwise uses the local IP address in the returned value. + /// + /// + /// + /// + public static string GetReturnAddress(Address value, Address target) + { + var machine = target.Machine; + + IPAddress targetIpAddress; + + //see if the target is an IP address, if so, get our own local ip address + if (IPAddress.TryParse(machine, out targetIpAddress)) + { + if (string.IsNullOrEmpty(localIp)) + localIp = LocalIpAddress(targetIpAddress); + + return PREFIX_TCP + localIp + PRIVATE + value.Queue; } + + return PREFIX + GetFullPathWithoutPrefix(value); + } + + static string LocalIpAddress(IPAddress targetIpAddress) + { + var networkInterfaces = NetworkInterface.GetAllNetworkInterfaces(); + + var availableAddresses = + networkInterfaces.Where( + ni => + ni.OperationalStatus == OperationalStatus.Up && + ni.NetworkInterfaceType != NetworkInterfaceType.Loopback) + .SelectMany(ni=>ni.GetIPProperties().UnicastAddresses).ToList(); + + var firstWithMatchingFamily = + availableAddresses.FirstOrDefault(a => a.Address.AddressFamily == targetIpAddress.AddressFamily); + + if (firstWithMatchingFamily != null) + return firstWithMatchingFamily.Address.ToString(); + + var fallbackToDifferentFamily = availableAddresses.FirstOrDefault(); + + if (fallbackToDifferentFamily != null) + return fallbackToDifferentFamily.Address.ToString(); + + return "127.0.0.1"; + } + + static string localIp; + + /// + /// Returns the full path without Format or direct os + /// from a '@' separated path. + /// + /// + /// + public static string GetFullPathWithoutPrefix(string value) + { + return getMachineNameFromLogicalName(value) + PRIVATE + getQueueNameFromLogicalName(value); + } + + /// + /// Returns the full path without Format or direct os + /// from an address. + /// + /// + /// + public static string GetFullPathWithoutPrefix(Address address) + { + return address.Machine + PRIVATE + address.Queue; + } + + /// + /// Returns the machine name from a '@' separated full logical name, + /// or the Environment.MachineName otherwise. + /// + /// + /// + [Obsolete("Use Address.Machine instead.", true)] + public static string GetMachineNameFromLogicalName(string logicalName) + { + return getMachineNameFromLogicalName(logicalName); + } + + private static string getMachineNameFromLogicalName(string logicalName) + { + string[] arr = logicalName.Split('@'); + + string machine = Environment.MachineName; + + if (arr.Length >= 2) + if (arr[1] != "." && arr[1].ToLower() != "localhost") + machine = arr[1]; + + return machine; + } + + /// + /// Returns the queue name from a '@' separated full logical name. + /// + /// + /// + [Obsolete("Use Address.Queue instead.", true)] + public static string GetQueueNameFromLogicalName(string logicalName) + { + return getQueueNameFromLogicalName(logicalName); + } + + private static string getQueueNameFromLogicalName(string logicalName) + { + string[] arr = logicalName.Split('@'); + + if (arr.Length >= 1) + return arr[0]; + + return null; + } + + /// + /// Checks whether or not a queue is local by its path. + /// + /// The path to the queue to check. + /// true if the queue is local, otherwise false. + public static bool QueueIsLocal(string value) + { + var machineName = Environment.MachineName.ToLower(); + + value = value.ToLower().Replace(PREFIX.ToLower(), ""); + var index = value.IndexOf('\\'); + + var queueMachineName = value.Substring(0, index).ToLower(); + + return (machineName == queueMachineName || queueMachineName == "localhost" || queueMachineName == "."); + } + + /// + /// Gets an independent address for the queue in the form: + /// queue@machine. + /// + /// + /// + public static Address GetIndependentAddressForQueue(MessageQueue q) + { + if (q == null) + return null; + + string[] arr = q.FormatName.Split('\\'); + string queueName = arr[arr.Length - 1]; + + int directPrefixIndex = arr[0].IndexOf(DIRECTPREFIX); + if (directPrefixIndex >= 0) + return new Address(queueName, arr[0].Substring(directPrefixIndex + DIRECTPREFIX.Length)); + + int tcpPrefixIndex = arr[0].IndexOf(DIRECTPREFIX_TCP); + if (tcpPrefixIndex >= 0) + return new Address(queueName, arr[0].Substring(tcpPrefixIndex + DIRECTPREFIX_TCP.Length)); + + try + { + // the pessimistic approach failed, try the optimistic approach + arr = q.QueueName.Split('\\'); + queueName = arr[arr.Length - 1]; + return new Address(queueName, q.MachineName); + } + catch + { + throw new Exception("Could not translate format name to independent name: " + q.FormatName); + } + } + + /// + /// Returns the number of messages in the queue. + /// + /// + public static int GetNumberOfPendingMessages(string queueName) + { + var q = new MessageQueue(getFullPath(queueName)); + + var qMgmt = new MSMQ.MSMQManagementClass(); + object machine = Environment.MachineName; + var missing = Type.Missing; + object formatName = q.FormatName; + + qMgmt.Init(ref machine, ref missing, ref formatName); + return qMgmt.MessageCount; + } + + /// + /// Converts an MSMQ message to a TransportMessage. + /// + /// + /// + public static TransportMessage Convert(Message m) + { + var result = new TransportMessage + { + Id = m.Id, + CorrelationId = + (m.CorrelationId == "00000000-0000-0000-0000-000000000000\\0" + ? null + : m.CorrelationId), + Recoverable = m.Recoverable, + TimeToBeReceived = m.TimeToBeReceived, + TimeSent = m.SentTime, + ReplyToAddress = GetIndependentAddressForQueue(m.ResponseQueue), + MessageIntent = Enum.IsDefined(typeof(MessageIntentEnum), m.AppSpecific) ? (MessageIntentEnum)m.AppSpecific : MessageIntentEnum.Send + }; + + m.BodyStream.Position = 0; + result.Body = new byte[m.BodyStream.Length]; + m.BodyStream.Read(result.Body, 0, result.Body.Length); + + result.Headers = new Dictionary(); + if (m.Extension.Length > 0) + { + var stream = new MemoryStream(m.Extension); + var o = headerSerializer.Deserialize(stream); + + foreach (var pair in o as List) + if (pair.Key != null) + result.Headers.Add(pair.Key, pair.Value); + } + + result.Id = result.GetOriginalId(); + if (result.Headers.ContainsKey("EnclosedMessageTypes")) // This is a V2.6 message + ExtractMsmqMessageLabelInformationForBackwardCompatibility(m, result); + result.IdForCorrelation = result.GetIdForCorrelation(); + + return result; + } + + /// + /// For backward compatibility, extract the V2.6 MSMQ label content (IdForCorrelation and WindowsIdentityName) + /// into the V3.X transport message. + /// + /// Received MSMQ message + /// Transport message to be filled from MSMQ message label + private static void ExtractMsmqMessageLabelInformationForBackwardCompatibility(Message msmqMsg, TransportMessage result) + { + if (string.IsNullOrWhiteSpace(msmqMsg.Label)) + return; + + if (msmqMsg.Label.Contains(TransportHeaderKeys.IdForCorrelation)) + { + int idStartIndex = msmqMsg.Label.IndexOf(string.Format("<{0}>", TransportHeaderKeys.IdForCorrelation)) + TransportHeaderKeys.IdForCorrelation.Length + 2; + int idCount = msmqMsg.Label.IndexOf(string.Format("", TransportHeaderKeys.IdForCorrelation)) - idStartIndex; + + result.IdForCorrelation = msmqMsg.Label.Substring(idStartIndex, idCount); + } + + if (msmqMsg.Label.Contains(Headers.WindowsIdentityName) && !result.Headers.ContainsKey(Headers.WindowsIdentityName)) + { + int winStartIndex = msmqMsg.Label.IndexOf(string.Format("<{0}>", Headers.WindowsIdentityName)) + Headers.WindowsIdentityName.Length + 2; + int winCount = msmqMsg.Label.IndexOf(string.Format("", Headers.WindowsIdentityName)) - winStartIndex; + + result.Headers.Add(Headers.WindowsIdentityName, msmqMsg.Label.Substring(winStartIndex, winCount)); + } + } + + /// + /// Converts a TransportMessage to an Msmq message. + /// Doesn't set the ResponseQueue of the result. + /// + /// + /// + public static Message Convert(TransportMessage message) + { + var result = new Message(); + + if (message.Body != null) + result.BodyStream = new MemoryStream(message.Body); + + if (message.CorrelationId != null) + result.CorrelationId = message.CorrelationId; + + result.Recoverable = message.Recoverable; + + if (message.TimeToBeReceived < MessageQueue.InfiniteTimeout) + result.TimeToBeReceived = message.TimeToBeReceived; + + if (message.Headers == null) + message.Headers = new Dictionary(); + + if (!message.Headers.ContainsKey(TransportHeaderKeys.IdForCorrelation)) + message.Headers.Add(TransportHeaderKeys.IdForCorrelation, null); + + if (String.IsNullOrEmpty(message.Headers[TransportHeaderKeys.IdForCorrelation])) + message.Headers[TransportHeaderKeys.IdForCorrelation] = message.IdForCorrelation; + + using (var stream = new MemoryStream()) + { + headerSerializer.Serialize(stream, message.Headers.Select(pair => new HeaderInfo { Key = pair.Key, Value = pair.Value }).ToList()); + result.Extension = stream.GetBuffer(); + } + + result.AppSpecific = (int)message.MessageIntent; + + FillLabelForBackwardsCompatabilityWhileSending(message, result); + + return result; + } + /// + /// Fill MSMQ message's label to be compatible with NServiceBus V2.6 + /// + /// + /// + static void FillLabelForBackwardsCompatabilityWhileSending(TransportMessage transportMessage, Message msmqMessage) + { + string windowsIdentityName = + (transportMessage.Headers.ContainsKey(Headers.WindowsIdentityName) && (!string.IsNullOrWhiteSpace(transportMessage.Headers[Headers.WindowsIdentityName]))) + ? transportMessage.Headers[Headers.WindowsIdentityName] : string.Empty; + + msmqMessage.Label = + string.Format("<{0}>{2}<{1}>{3}", TransportHeaderKeys.IdForCorrelation, Headers.WindowsIdentityName, + transportMessage.IdForCorrelation, windowsIdentityName); + } + + private const string DIRECTPREFIX = "DIRECT=OS:"; + private static readonly string DIRECTPREFIX_TCP = "DIRECT=TCP:"; + private readonly static string PREFIX_TCP = "FormatName:" + DIRECTPREFIX_TCP; + private static readonly string PREFIX = "FormatName:" + DIRECTPREFIX; + private const string PRIVATE = "\\private$\\"; + + private static readonly XmlSerializer headerSerializer = new XmlSerializer(typeof(List)); - if (msmqMsg.Label.Contains(Headers.WindowsIdentityName) && !result.Headers.ContainsKey(Headers.WindowsIdentityName)) - { - int winStartIndex = msmqMsg.Label.IndexOf(string.Format("<{0}>", Headers.WindowsIdentityName)) + Headers.WindowsIdentityName.Length + 2; - int winCount = msmqMsg.Label.IndexOf(string.Format("", Headers.WindowsIdentityName)) - winStartIndex; - - result.Headers.Add(Headers.WindowsIdentityName, msmqMsg.Label.Substring(winStartIndex, winCount)); - } - } - - /// - /// Converts a TransportMessage to an Msmq message. - /// Doesn't set the ResponseQueue of the result. - /// - /// - /// - public static Message Convert(TransportMessage message) - { - var result = new Message(); - - if (message.Body != null) - result.BodyStream = new MemoryStream(message.Body); - - if (message.CorrelationId != null) - result.CorrelationId = message.CorrelationId; - - result.Recoverable = message.Recoverable; - - if (message.TimeToBeReceived < MessageQueue.InfiniteTimeout) - result.TimeToBeReceived = message.TimeToBeReceived; - - if (message.Headers == null) - message.Headers = new Dictionary(); - - if (!message.Headers.ContainsKey(TransportHeaderKeys.IdForCorrelation)) - message.Headers.Add(TransportHeaderKeys.IdForCorrelation, null); - - if (String.IsNullOrEmpty(message.Headers[TransportHeaderKeys.IdForCorrelation])) - message.Headers[TransportHeaderKeys.IdForCorrelation] = message.IdForCorrelation; - - using (var stream = new MemoryStream()) - { - headerSerializer.Serialize(stream, message.Headers.Select(pair => new HeaderInfo { Key = pair.Key, Value = pair.Value }).ToList()); - result.Extension = stream.GetBuffer(); - } - - result.AppSpecific = (int)message.MessageIntent; - - FillLabelForBackwardsCompatabilityWhileSending(message, result); - - return result; - } - /// - /// Fill MSMQ message's label to be compatible with NServiceBus V2.6 - /// - /// - /// - static void FillLabelForBackwardsCompatabilityWhileSending(TransportMessage transportMessage, Message msmqMessage) - { - string windowsIdentityName = - (transportMessage.Headers.ContainsKey(Headers.WindowsIdentityName) && (!string.IsNullOrWhiteSpace(transportMessage.Headers[Headers.WindowsIdentityName]))) - ? transportMessage.Headers[Headers.WindowsIdentityName] : string.Empty; - - msmqMessage.Label = - string.Format("<{0}>{2}<{1}>{3}", TransportHeaderKeys.IdForCorrelation, Headers.WindowsIdentityName, - transportMessage.IdForCorrelation, windowsIdentityName); - } - - private const string DIRECTPREFIX = "DIRECT=OS:"; - private static readonly string DIRECTPREFIX_TCP = "DIRECT=TCP:"; - private readonly static string PREFIX_TCP = "FormatName:" + DIRECTPREFIX_TCP; - private static readonly string PREFIX = "FormatName:" + DIRECTPREFIX; - private const string PRIVATE = "\\private$\\"; - - private static readonly XmlSerializer headerSerializer = new XmlSerializer(typeof(List)); - - } + } } \ No newline at end of file diff --git a/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj b/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj index aeafbd91453..446b1e8bceb 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj +++ b/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj @@ -58,6 +58,9 @@ ..\..\..\packages\NLog.2.0.0.2000\lib\net40\NLog.dll + + ..\..\..\build\nservicebus.core\NServiceBus.Config.dll + ..\..\..\packages\NUnit.2.5.10.11092\lib\nunit.framework.dll diff --git a/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs b/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs index 93f2965c1eb..7ecbed31d53 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs @@ -1,6 +1,7 @@ namespace NServiceBus.Timeout.Tests { using System; + using System.Collections.Generic; using System.Linq; using Core; using NUnit.Framework; @@ -24,7 +25,38 @@ public void Should_return_the_complete_list_of_timeouts() Assert.AreEqual(numberOfTimeoutsToAdd, persister.GetAll().Count()); } + [Test] + public void Should_only_return_timeouts_for_this_specific_endpoint_and_any_ones_without_a_owner() + { + const int numberOfTimeoutsToAdd = 3; + + for (var i = 0; i < numberOfTimeoutsToAdd; i++) + { + var d = new TimeoutData + { + Time = DateTime.UtcNow.AddHours(1), + OwningTimeoutManager = Configure.EndpointName + }; + persister.Add(d); + } + + persister.Add(new TimeoutData + { + Time = DateTime.UtcNow.AddHours(1), + OwningTimeoutManager = "MyOtherTM" + }); + + + persister.Add(new TimeoutData + { + Time = DateTime.UtcNow.AddHours(1), + }); + + + Assert.AreEqual(numberOfTimeoutsToAdd+1, persister.GetAll().Count()); + } + [Test] public void Should_return_the_complete_list_of_timeouts_without_hitting_the_maximum_number_of_requests_allowed_for_this_session_has_been_reached() { diff --git a/tests/timeout/NServiceBus.Timeout.Tests/WithRavenTimeoutPersister.cs b/tests/timeout/NServiceBus.Timeout.Tests/WithRavenTimeoutPersister.cs index bd4410ed098..e4dfb2b004b 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/WithRavenTimeoutPersister.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/WithRavenTimeoutPersister.cs @@ -15,6 +15,8 @@ public class WithRavenTimeoutPersister [SetUp] public void SetupContext() { + Configure.GetEndpointNameAction = () => "MyEndpoint"; + store = new EmbeddableDocumentStore { RunInMemory = true }; //store = new DocumentStore { Url = "http://localhost:8080", DefaultDatabase = "MyServer" }; store.Conventions.DefaultQueryingConsistency = ConsistencyOptions.QueryYourWrites; //This turns on WaitForNonStaleResults() on queries globally diff --git a/tools/NuGet/packit.psm1 b/tools/NuGet/packit.psm1 index 78384656625..ba26859a947 100644 --- a/tools/NuGet/packit.psm1 +++ b/tools/NuGet/packit.psm1 @@ -7,6 +7,7 @@ $script:packit.package_owners = "Udi Dahan, Andreas Ohlund, Jonathan Matheus, Sh $script:packit.package_authors = "NServiceBus Ltd" $script:packit.package_description = "The most popular open-source service bus for .net" $script:packit.package_releaseNotes = "" +$script:packit.package_copyright = "Copyright (C) NServiceBus 2010-2012" $script:packit.package_language = "en-US" $script:packit.package_licenseUrl = "http://nservicebus.com/license.aspx" $script:packit.package_projectUrl = "http://nservicebus.com/" @@ -179,6 +180,7 @@ function Invoke-Packit $nuGetSpecContent.package.metadata.tags = $script:packit.package_tags $nuGetSpecContent.package.metadata.iconUrl = $script:packit.package_iconUrl; $nuGetSpecContent.package.metadata.releaseNotes = $script:packit.package_releaseNotes + $nuGetSpecContent.package.metadata.copyright = $script:packit.package_copyright $dependencyInnerXml = "" if($dependencies.Count -gt 0) { diff --git a/tools/psake/psake.psm1 b/tools/psake/psake.psm1 index cbeda45dc57..fadd779b060 100644 --- a/tools/psake/psake.psm1 +++ b/tools/psake/psake.psm1 @@ -177,12 +177,14 @@ function Configure-BuildEnvironment $bitness = 'Framework' if($versionPart -ne '1.0' -and $versionPart -ne '1.1') { + switch ($bitnessPart) { 'x86' { $bitness = 'Framework' } 'x64' { $bitness = 'Framework64' } - $null { + $default { $ptrSize = [System.IntPtr]::Size + switch ($ptrSize) { 4 { $bitness = 'Framework' } @@ -190,7 +192,6 @@ function Configure-BuildEnvironment default { throw "Error: Unknown pointer size ($ptrSize) returned from System.IntPtr." } } } - default { throw "Error: Unknown .NET Framework bitness, $bitnessPart, specified in $framework" } } } $frameworkDirs = $versions | foreach { "$env:windir\Microsoft.NET\$bitness\$_\" }