diff --git a/EasyCaching.sln b/EasyCaching.sln index a2d217c4..9ae4c577 100644 --- a/EasyCaching.sln +++ b/EasyCaching.sln @@ -77,7 +77,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.Bus.Zookeeper", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.FasterKv", "src\EasyCaching.FasterKv\EasyCaching.FasterKv.csproj", "{7191E567-38DF-4879-82E1-73EC618AFCAC}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.Serialization.MemoryPack", "serialization\EasyCaching.Serialization.MemoryPack\EasyCaching.Serialization.MemoryPack.csproj", "{EEF22C21-F380-4980-B72C-F14488369333}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyCaching.Bus.RabbitMQStream", "bus\EasyCaching.Bus.RabbitMQStream\EasyCaching.Bus.RabbitMQStream.csproj", "{3C9D5E40-B3A5-4649-8B40-08094644B0FB}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyCaching.Serialization.MemoryPack", "serialization\EasyCaching.Serialization.MemoryPack\EasyCaching.Serialization.MemoryPack.csproj", "{EEF22C21-F380-4980-B72C-F14488369333}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyCaching.Demo.Locks", "sample\EasyCaching.Demo.Locks\EasyCaching.Demo.Locks.csproj", "{9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98}" EndProject @@ -211,6 +213,10 @@ Global {EEF22C21-F380-4980-B72C-F14488369333}.Debug|Any CPU.Build.0 = Debug|Any CPU {EEF22C21-F380-4980-B72C-F14488369333}.Release|Any CPU.ActiveCfg = Release|Any CPU {EEF22C21-F380-4980-B72C-F14488369333}.Release|Any CPU.Build.0 = Release|Any CPU + {3C9D5E40-B3A5-4649-8B40-08094644B0FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3C9D5E40-B3A5-4649-8B40-08094644B0FB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3C9D5E40-B3A5-4649-8B40-08094644B0FB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3C9D5E40-B3A5-4649-8B40-08094644B0FB}.Release|Any CPU.Build.0 = Release|Any CPU {9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98}.Debug|Any CPU.Build.0 = Debug|Any CPU {9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -251,6 +257,7 @@ Global {5E488583-391E-4E15-83C1-7301B4FE79AE} = {B337509B-75F9-4851-821F-9BBE87C4E4BC} {7191E567-38DF-4879-82E1-73EC618AFCAC} = {A0F5CC7E-155F-4726-8DEB-E966950B3FE9} {EEF22C21-F380-4980-B72C-F14488369333} = {15070C49-A507-4844-BCFE-D319CFBC9A63} + {3C9D5E40-B3A5-4649-8B40-08094644B0FB} = {B337509B-75F9-4851-821F-9BBE87C4E4BC} {9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98} = {F88D727A-9F9C-43D9-90B1-D4A02BF8BC98} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution diff --git a/build/version.props b/build/version.props index 5dc47ec6..00acbff7 100644 --- a/build/version.props +++ b/build/version.props @@ -16,6 +16,7 @@ 1.9.1 1.9.1 1.9.1 + 1.9.1 1.9.1 1.9.1 1.9.1 diff --git a/bus/EasyCaching.Bus.RabbitMQ/Configurations/RabbitMQBusOptions.cs b/bus/EasyCaching.Bus.RabbitMQ/Configurations/RabbitMQBusOptions.cs index f5a09bf6..359568fe 100644 --- a/bus/EasyCaching.Bus.RabbitMQ/Configurations/RabbitMQBusOptions.cs +++ b/bus/EasyCaching.Bus.RabbitMQ/Configurations/RabbitMQBusOptions.cs @@ -6,7 +6,7 @@ /// RabbitMQ Bus options. /// public class RabbitMQBusOptions : BaseRabbitMQOptions - { + { /// /// Gets or sets the name of the queue. /// diff --git a/bus/EasyCaching.Bus.RabbitMQStream/Configurations/EasyCachingOptionsExtensions.cs b/bus/EasyCaching.Bus.RabbitMQStream/Configurations/EasyCachingOptionsExtensions.cs new file mode 100644 index 00000000..9edbb383 --- /dev/null +++ b/bus/EasyCaching.Bus.RabbitMQStream/Configurations/EasyCachingOptionsExtensions.cs @@ -0,0 +1,68 @@ +namespace Microsoft.Extensions.DependencyInjection +{ + using System; + using EasyCaching.Bus.RabbitMQ; + using EasyCaching.Bus.RabbitMQStream; + using EasyCaching.Core; + using EasyCaching.Core.Configurations; + using Microsoft.Extensions.Configuration; + + /// + /// EasyCaching options extensions. + /// + public static class EasyCachingOptionsExtensions + { + /// + /// Withs the RabbitMQStream bus (specify the config via hard code). + /// + /// Options. + /// Configure bus settings. + + public static EasyCachingOptions WithRabbitMQStreamBus( + this EasyCachingOptions options + , Action configure + ) + { + ArgumentCheck.NotNull(configure, nameof(configure)); + + options.RegisterExtension(new RabbitMQStreamBusOptionsExtension(configure)); + return options; + } + + /// + /// Withs the RabbitMQStream bus (read config from configuration file). + /// + /// Options. + /// The configuration. + /// The section name in the configuration file. + public static EasyCachingOptions WithRabbitMQStreamBus( + this EasyCachingOptions options + , IConfiguration configuration + , string sectionName = EasyCachingConstValue.RabbitMQBusSection + ) + { + var dbConfig = configuration.GetSection(sectionName); + var busOptions = new RabbitMQBusOptions(); + dbConfig.Bind(busOptions); + + void configure(RabbitMQBusOptions x) + { + x.HostName = busOptions.HostName; + x.Password = busOptions.Password; + x.Port = busOptions.Port; + x.QueueMessageExpires = busOptions.QueueMessageExpires; + x.RequestedConnectionTimeout = busOptions.RequestedConnectionTimeout; + //x.RouteKey = busOptions.RouteKey; + x.SocketReadTimeout = busOptions.SocketReadTimeout; + x.SocketWriteTimeout = busOptions.SocketWriteTimeout; + x.TopicExchangeName = busOptions.TopicExchangeName; + x.UserName = busOptions.UserName; + x.VirtualHost = busOptions.VirtualHost; + x.QueueName = busOptions.QueueName; + } + + options.RegisterExtension(new RabbitMQStreamBusOptionsExtension(configure)); + return options; + } + } +} \ No newline at end of file diff --git a/bus/EasyCaching.Bus.RabbitMQStream/Configurations/RabbitMQStreamBusOptionsExtension.cs b/bus/EasyCaching.Bus.RabbitMQStream/Configurations/RabbitMQStreamBusOptionsExtension.cs new file mode 100644 index 00000000..275cf5ae --- /dev/null +++ b/bus/EasyCaching.Bus.RabbitMQStream/Configurations/RabbitMQStreamBusOptionsExtension.cs @@ -0,0 +1,43 @@ +namespace EasyCaching.Bus.RabbitMQStream +{ + using System; + using EasyCaching.Bus.RabbitMQ; + using EasyCaching.Core.Bus; + using EasyCaching.Core.Configurations; + using global::RabbitMQ.Client; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.ObjectPool; + + /// + /// RabbitMQ Bus options extension. + /// + public class RabbitMQStreamBusOptionsExtension : IEasyCachingOptionsExtension + { + /// + /// The configure. + /// + private readonly Action configure; + + /// + /// Initializes a new instance of the class. + /// + /// Configure. + public RabbitMQStreamBusOptionsExtension(Action configure) + { + this.configure = configure; + } + + /// + /// Adds the services. + /// + /// Services. + public void AddServices(IServiceCollection services) + { + services.AddOptions(); + services.Configure(configure); + + services.AddSingleton, ModelPooledObjectPolicy>(); + services.AddSingleton(); + } + } +} diff --git a/bus/EasyCaching.Bus.RabbitMQStream/DefaultRabbitMQStreamBus.cs b/bus/EasyCaching.Bus.RabbitMQStream/DefaultRabbitMQStreamBus.cs new file mode 100644 index 00000000..e4f82948 --- /dev/null +++ b/bus/EasyCaching.Bus.RabbitMQStream/DefaultRabbitMQStreamBus.cs @@ -0,0 +1,250 @@ +using Microsoft.Extensions.Logging; + +namespace EasyCaching.Bus.RabbitMQ +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using EasyCaching.Core; + using EasyCaching.Core.Bus; + using EasyCaching.Core.Serialization; + using global::RabbitMQ.Client; + using global::RabbitMQ.Client.Events; + using Microsoft.Extensions.ObjectPool; + using Microsoft.Extensions.Options; + + /// + /// Default RabbitMQ Bus. + /// + public class DefaultRabbitMQStreamBus : EasyCachingAbstractBus + { + /// + /// The subscriber connection. + /// + private readonly IConnection _subConnection; + + /// + /// The publish channel pool. + /// + private readonly ObjectPool _pubChannelPool; + + /// + /// The rabbitMQ Bus options. + /// + private readonly RabbitMQBusOptions _options; + + /// + /// The serializer. + /// + private readonly IEasyCachingSerializer _serializer; + + /// + /// The logger. + /// + private readonly ILogger _logger; + + /// + /// The identifier. + /// + private readonly string _busId; + + private static readonly Dictionary _streamArgs = new Dictionary + { + { "x-queue-type", "stream" }, + { "x-max-age", "5m" }, + { "x-stream-max-segment-size-bytes", 4_000_000 } + }; + + /// + /// Initializes a new instance of the class. + /// + /// Object policy. + /// RabbitMQ Options. + /// Serializer. + public DefaultRabbitMQStreamBus( + IPooledObjectPolicy objectPolicy + , IOptions rabbitMQOptions + , IEasyCachingSerializer serializer + , Lazy> logger) + { + this._options = rabbitMQOptions.Value; + this._serializer = serializer; + this._logger = logger.Value; + + _logger?.LogInformation("DefaultRabbitMQStreamBus: Initializing"); + try + { + var factory = new ConnectionFactory + { + HostName = _options.HostName, + UserName = _options.UserName, + Port = _options.Port, + Password = _options.Password, + VirtualHost = _options.VirtualHost, + RequestedConnectionTimeout = System.TimeSpan.FromMilliseconds(_options.RequestedConnectionTimeout), + SocketReadTimeout = System.TimeSpan.FromMilliseconds(_options.SocketReadTimeout), + SocketWriteTimeout = System.TimeSpan.FromMilliseconds(_options.SocketWriteTimeout), + ClientProvidedName = _options.ClientProvidedName + }; + + _subConnection = factory.CreateConnection(); + + _subConnection.ConnectionShutdown += (_, e) => + { + _logger?.LogError($"ConnectionShutdown: {e.ReplyText}"); + }; + + var provider = new DefaultObjectPoolProvider(); + + _pubChannelPool = provider.Create(objectPolicy); + + _busId = Guid.NewGuid().ToString("N"); + + BusName = "easycachingbus"; + } + catch (Exception ex) + { + _logger?.LogError(ex, $"DefaultRabbitMQStreamBus: Initializing failed ({ex.Message})"); + throw; + } + } + + /// + /// Publish the specified topic and message. + /// + /// Topic. + /// Message. + public override void BasePublish(string topic, EasyCachingMessage message) + { + var channel = _pubChannelPool.Get(); + + try + { + var body = _serializer.Serialize(message); + + channel.ExchangeDeclare(_options.TopicExchangeName, ExchangeType.Topic, true, false, null); + channel.BasicPublish(_options.TopicExchangeName, topic, false, null, body); + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); + } + finally + { + _pubChannelPool.Return(channel); + } + } + + /// + /// Publish the specified topic and message async. + /// + /// The async. + /// Topic. + /// Message. + /// Cancellation token. + public override Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken)) + { + var channel = _pubChannelPool.Get(); + try + { + var body = _serializer.Serialize(message); + + channel.ExchangeDeclare(_options.TopicExchangeName, ExchangeType.Topic, true, false, null); + channel.BasicPublish(_options.TopicExchangeName, topic, false, null, body); + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); + } + finally + { + _pubChannelPool.Return(channel); + } + return Task.CompletedTask; + } + + /// + /// Subscribe the specified topic and action. + /// + /// Topic. + /// Action. + public override void BaseSubscribe(string topic, Action action) + { + var queueName = $"rmq.stream.easycaching.{topic}"; + + Task.Factory.StartNew( + () => StartConsumer(queueName, topic), + TaskCreationOptions.LongRunning); + } + + /// + /// Subscribe the specified topic and action async. + /// + /// Topic. + /// Action. + /// Cancellation token. + public override Task BaseSubscribeAsync(string topic, Action action, CancellationToken cancellationToken = default(CancellationToken)) + { + _logger?.LogWarning($"BaseSubscribeAsync for {topic}"); + + var queueName = $"rmq.stream.easycaching.{topic}"; + + StartConsumer(queueName, topic); + return Task.CompletedTask; + } + + private void StartConsumer(string queueName, string topic) + { + _logger?.LogWarning($"Starting Consumer for {topic} ({queueName})"); + var channel = _subConnection.CreateModel(); + + channel.ModelShutdown += (_, e) => + { + _logger?.LogError($"ModelShutdown: {e.ReplyText}"); + }; + + // Streams require Qos setup + channel.BasicQos(0, 100, false); + channel.ExchangeDeclare(_options.TopicExchangeName, ExchangeType.Topic, true, false, null); + channel.QueueDeclare(queueName, true, false, false, _streamArgs); + // bind the queue with the exchange. + channel.QueueBind(queueName, _options.TopicExchangeName, topic); + var consumer = new EventingBasicConsumer(channel); + consumer.Received += OnMessage; + + consumer.ConsumerCancelled += (sender, e) => + { + _logger?.LogError($"EasyCaching.DefaultRabbitMQStreamBus.OnConsumerCancelled: (Q: {queueName})"); + }; + + consumer.Shutdown += (sender, e) => + { + _logger?.LogError($"EasyCaching.DefaultRabbitMQStreamBus.OnConsumerShutdown: (Q: {queueName}) {e.ReplyText}"); + StartConsumer(queueName, topic); + BaseOnReconnect(); + }; + + channel.BasicConsume(queueName, false, consumer); + } + + /// + /// Ons the message. + /// + /// Sender. + /// E. + private void OnMessage(object sender, BasicDeliverEventArgs e) + { + try + { + var message = _serializer.Deserialize(e.Body.ToArray()); + _logger?.LogDebug(string.Join(",", message.CacheKeys)); + BaseOnMessage(message); + } + finally + { + (sender as EventingBasicConsumer)?.Model.BasicAck(e.DeliveryTag, false); + } + } + } +} diff --git a/bus/EasyCaching.Bus.RabbitMQStream/EasyCaching.Bus.RabbitMQStream.csproj b/bus/EasyCaching.Bus.RabbitMQStream/EasyCaching.Bus.RabbitMQStream.csproj new file mode 100644 index 00000000..4a1d9b02 --- /dev/null +++ b/bus/EasyCaching.Bus.RabbitMQStream/EasyCaching.Bus.RabbitMQStream.csproj @@ -0,0 +1,50 @@ + + + + + netstandard2.0;net6.0 + ncc;Thomas Sarmis + ncc;Thomas Sarmis + $(EasyCachingRabbitStreamBusPackageVersion) + + + A simple caching bus(message bus) based on RabbitMQ using amqp Streams. + + Bus,Hybrid,RabbitMQ,Caching,Cache + https://github.com/dotnetcore/EasyCaching + LICENSE + https://github.com/dotnetcore/EasyCaching + https://github.com/dotnetcore/EasyCaching + nuget-icon.png + + $(EasyCachingRabbitStreamBusPackageNotes) + + + + + true + $(NoWarn);1591 + + + + + + + + + + + + + + + + + + + + + + + +