From 14bbda418d2b31b240d8b315519f325455a2ff60 Mon Sep 17 00:00:00 2001 From: bingtianyiyan <48984656+bingtianyiyan@users.noreply.github.com> Date: Tue, 18 Oct 2022 18:59:06 +0800 Subject: [PATCH] whether can support zookeeper bus and kafka bus #404 (#405) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat:add EasyCaching.Bus.ConfluentKafka module 1.use kafka to notify cache to change 2.testing seeing sample EasyCaching.Demo.Providers ValuesBusController * feat: 1.Add ZooKeeperBus Module, 2.kafakBus options write AutoOffsetReset value * feat:zookeeperBus * feat: 1.zk bus * fix: 1.add zk config command,zk init time compare 2.add testBus * feat: 1.Eliminate careless writing and code formatting * fix: 1.edit build/releasenotes.props and build/version.props add zookeeper/kafka bus module notes and version 2.EasyCaching.Demo.Providers/Startup.cs to original file data --- EasyCaching.sln | 18 +- build/releasenotes.props | 6 + build/version.props | 2 + .../ConfluentKafkaBusOptions.cs | 39 ++ .../ConfluentKafkaOptionsExtension.cs | 45 +++ .../EasyCachingOptionsExtensions.cs | 81 ++++ .../DefaultConfluentKafkaBus.cs | 149 +++++++ .../EasyCaching.Bus.ConfluentKafka.csproj | 42 ++ .../EasyCachingOptionsExtensions.cs | 64 +++ .../Configurations/ZkBusOptions.cs | 89 +++++ .../ZookeeperOptionsExtension.cs | 38 ++ .../DefaultZookeeperBus.cs | 376 ++++++++++++++++++ .../EasyCaching.Bus.Zookeeper.csproj | 43 ++ .../Controllers/ValuesBusController.cs | 46 +++ .../EasyCaching.Demo.Providers.csproj | 2 + sample/EasyCaching.Demo.Providers/Program.cs | 11 + sample/EasyCaching.Demo.Providers/Startup.cs | 156 ++++---- .../appsettings.json | 117 +++--- .../Internal/EasyCachingConstValue.cs | 13 +- 19 files changed, 1203 insertions(+), 134 deletions(-) create mode 100644 bus/EasyCaching.Bus.ConfluentKafka/Configurations/ConfluentKafkaBusOptions.cs create mode 100644 bus/EasyCaching.Bus.ConfluentKafka/Configurations/ConfluentKafkaOptionsExtension.cs create mode 100644 bus/EasyCaching.Bus.ConfluentKafka/Configurations/EasyCachingOptionsExtensions.cs create mode 100644 bus/EasyCaching.Bus.ConfluentKafka/DefaultConfluentKafkaBus.cs create mode 100644 bus/EasyCaching.Bus.ConfluentKafka/EasyCaching.Bus.ConfluentKafka.csproj create mode 100644 bus/EasyCaching.Bus.Zookeeper/Configurations/EasyCachingOptionsExtensions.cs create mode 100644 bus/EasyCaching.Bus.Zookeeper/Configurations/ZkBusOptions.cs create mode 100644 bus/EasyCaching.Bus.Zookeeper/Configurations/ZookeeperOptionsExtension.cs create mode 100644 bus/EasyCaching.Bus.Zookeeper/DefaultZookeeperBus.cs create mode 100644 bus/EasyCaching.Bus.Zookeeper/EasyCaching.Bus.Zookeeper.csproj create mode 100644 sample/EasyCaching.Demo.Providers/Controllers/ValuesBusController.cs diff --git a/EasyCaching.sln b/EasyCaching.sln index a595298f..ff571815 100644 --- a/EasyCaching.sln +++ b/EasyCaching.sln @@ -1,6 +1,6 @@ Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.29324.140 +# Visual Studio Version 17 +VisualStudioVersion = 17.2.32616.157 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{A0F5CC7E-155F-4726-8DEB-E966950B3FE9}" EndProject @@ -70,6 +70,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.LiteDB", "src\E EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.Serialization.SystemTextJson", "serialization\EasyCaching.Serialization.SystemTextJson\EasyCaching.Serialization.SystemTextJson.csproj", "{4FCF16BF-5E21-4B74-AB45-3C121ADF1485}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.Bus.ConfluentKafka", "bus\EasyCaching.Bus.ConfluentKafka\EasyCaching.Bus.ConfluentKafka.csproj", "{F7FBADEB-D766-4595-949A-07104B52692C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyCaching.Bus.Zookeeper", "bus\EasyCaching.Bus.Zookeeper\EasyCaching.Bus.Zookeeper.csproj", "{5E488583-391E-4E15-83C1-7301B4FE79AE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -184,6 +188,14 @@ Global {4FCF16BF-5E21-4B74-AB45-3C121ADF1485}.Debug|Any CPU.Build.0 = Debug|Any CPU {4FCF16BF-5E21-4B74-AB45-3C121ADF1485}.Release|Any CPU.ActiveCfg = Release|Any CPU {4FCF16BF-5E21-4B74-AB45-3C121ADF1485}.Release|Any CPU.Build.0 = Release|Any CPU + {F7FBADEB-D766-4595-949A-07104B52692C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F7FBADEB-D766-4595-949A-07104B52692C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F7FBADEB-D766-4595-949A-07104B52692C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F7FBADEB-D766-4595-949A-07104B52692C}.Release|Any CPU.Build.0 = Release|Any CPU + {5E488583-391E-4E15-83C1-7301B4FE79AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5E488583-391E-4E15-83C1-7301B4FE79AE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5E488583-391E-4E15-83C1-7301B4FE79AE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5E488583-391E-4E15-83C1-7301B4FE79AE}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -216,6 +228,8 @@ Global {711603E1-8363-4F8D-9AA9-8C03EC8BD35F} = {B4241D34-A973-4A13-BD89-9BAE3F2BDDF6} {BA850294-3103-4540-8A27-FC768E1DC8FC} = {A0F5CC7E-155F-4726-8DEB-E966950B3FE9} {4FCF16BF-5E21-4B74-AB45-3C121ADF1485} = {15070C49-A507-4844-BCFE-D319CFBC9A63} + {F7FBADEB-D766-4595-949A-07104B52692C} = {B337509B-75F9-4851-821F-9BBE87C4E4BC} + {5E488583-391E-4E15-83C1-7301B4FE79AE} = {B337509B-75F9-4851-821F-9BBE87C4E4BC} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {63A57886-054B-476C-AAE1-8D7C8917682E} diff --git a/build/releasenotes.props b/build/releasenotes.props index 2eff3fe0..7f8f2e74 100644 --- a/build/releasenotes.props +++ b/build/releasenotes.props @@ -50,6 +50,12 @@ 1. Upgrading dependencies. + + 1. Add EasyCachingKafkaBus. + + + 1. Add EasyCachingZookeeperBus. + 1. Add CancellationToken for async methods. diff --git a/build/version.props b/build/version.props index 0bb59cda..dc769fd0 100644 --- a/build/version.props +++ b/build/version.props @@ -16,6 +16,8 @@ 1.6.1 1.6.1 1.6.1 + 1.6.1 + 1.6.1 1.6.1 1.6.1 1.6.1 diff --git a/bus/EasyCaching.Bus.ConfluentKafka/Configurations/ConfluentKafkaBusOptions.cs b/bus/EasyCaching.Bus.ConfluentKafka/Configurations/ConfluentKafkaBusOptions.cs new file mode 100644 index 00000000..27c35007 --- /dev/null +++ b/bus/EasyCaching.Bus.ConfluentKafka/Configurations/ConfluentKafkaBusOptions.cs @@ -0,0 +1,39 @@ +using Confluent.Kafka; + +namespace EasyCaching.Bus.ConfluentKafka +{ + /// + /// kafka bus options + /// + public class ConfluentKafkaBusOptions + { + /// + /// kafka address(BootstrapServers must) + /// + public string BootstrapServers { get; set; } + + + /// + /// kafka bus producer options. + /// + public ProducerConfig ProducerConfig { get; set; } + + /// + /// kafka bus consumer options.(if GroupId value below is empty,then ConsumerConfig.GroupId must ) + /// + public ConsumerConfig ConsumerConfig { get; set; } + + /// + /// kafka bus consumer options with consumer groupId + /// (if ConsumerConfig below has give GroupId value , this options can ignore) + /// import:if application is cluster,you should set this different value in application,this will make consumer can consumerdata + /// + public string GroupId { get; set; } + + /// + /// kafka bus consumer consume count + /// + public int ConsumerCount { get; set; } = 1; + } + +} diff --git a/bus/EasyCaching.Bus.ConfluentKafka/Configurations/ConfluentKafkaOptionsExtension.cs b/bus/EasyCaching.Bus.ConfluentKafka/Configurations/ConfluentKafkaOptionsExtension.cs new file mode 100644 index 00000000..aeaf8fd8 --- /dev/null +++ b/bus/EasyCaching.Bus.ConfluentKafka/Configurations/ConfluentKafkaOptionsExtension.cs @@ -0,0 +1,45 @@ +namespace EasyCaching.Bus.ConfluentKafka +{ + using System; + using EasyCaching.Core.Bus; + using EasyCaching.Core.Configurations; + using EasyCaching.Core.Serialization; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.DependencyInjection.Extensions; + + /// + /// Kafka options extension. + /// + internal sealed class ConfluentKafkaOptionsExtension : IEasyCachingOptionsExtension + { + + private readonly Action _confluentKafkaBusOptions; + + public ConfluentKafkaOptionsExtension(Action confluentKafkaBusOptions) + { + this._confluentKafkaBusOptions = confluentKafkaBusOptions; + } + + /// + /// Adds the services. + /// + /// Services. + public void AddServices(IServiceCollection services) + { + services.AddOptions(); + + services.TryAddSingleton(); + + services.AddOptions() + .Configure(_confluentKafkaBusOptions); + + + //var options = services.BuildServiceProvider() + // .GetRequiredService>() + // .Value; + + services.AddSingleton(); + + } + } +} diff --git a/bus/EasyCaching.Bus.ConfluentKafka/Configurations/EasyCachingOptionsExtensions.cs b/bus/EasyCaching.Bus.ConfluentKafka/Configurations/EasyCachingOptionsExtensions.cs new file mode 100644 index 00000000..4fd1cb98 --- /dev/null +++ b/bus/EasyCaching.Bus.ConfluentKafka/Configurations/EasyCachingOptionsExtensions.cs @@ -0,0 +1,81 @@ +namespace Microsoft.Extensions.DependencyInjection +{ + using Confluent.Kafka; + using EasyCaching.Bus.ConfluentKafka; + using EasyCaching.Core; + using EasyCaching.Core.Configurations; + using Microsoft.Extensions.Configuration; + using System; + + /// + /// EasyCaching options extensions. + /// + public static class EasyCachingOptionsExtensions + { + /// + /// Withs the ConfluentKafka bus (specify the config via hard code). + /// + /// + /// + /// + public static EasyCachingOptions WithConfluentKafkaBus( + this EasyCachingOptions options + , Action configure + ) + { + ArgumentCheck.NotNull(configure, nameof(configure)); + //option convert + ConfluentKafkaBusOptions kafkaOptions = new ConfluentKafkaBusOptions(); + configure.Invoke(kafkaOptions); + void kafkaBusConfigure(ConfluentKafkaBusOptions x) + { + x.BootstrapServers = kafkaOptions.BootstrapServers; + x.ProducerConfig = kafkaOptions.ProducerConfig ?? new ProducerConfig(); + x.ConsumerConfig = kafkaOptions.ConsumerConfig ?? new ConsumerConfig(); + //address + x.ProducerConfig.BootstrapServers = x.ProducerConfig.BootstrapServers ?? kafkaOptions.BootstrapServers; + x.ConsumerConfig.BootstrapServers = x.ConsumerConfig.BootstrapServers ?? kafkaOptions.BootstrapServers; + //consumer groupId + x.ConsumerConfig.GroupId = x.ConsumerConfig.GroupId ?? kafkaOptions.GroupId; + x.ConsumerConfig.AutoOffsetReset = kafkaOptions.ConsumerConfig.AutoOffsetReset ?? AutoOffsetReset.Latest; + } + + options.RegisterExtension(new ConfluentKafkaOptionsExtension(kafkaBusConfigure)); + return options; + } + + /// + /// Withs the ConfluentKafka bus (read config from configuration file). + /// + /// + /// + /// The section name in the configuration file. + /// + public static EasyCachingOptions WithConfluentKafkaBus( + this EasyCachingOptions options + , IConfiguration configuration + , string sectionName = EasyCachingConstValue.KafkaBusSection + ) + { + var dbConfig = configuration.GetSection(sectionName); + var kafkaOptions = new ConfluentKafkaBusOptions(); + dbConfig.Bind(kafkaOptions); + + void configure(ConfluentKafkaBusOptions x) + { + x.BootstrapServers = kafkaOptions.BootstrapServers; + x.ProducerConfig = kafkaOptions.ProducerConfig ?? new ProducerConfig(); + x.ConsumerConfig = kafkaOptions.ConsumerConfig ?? new ConsumerConfig(); + //address + x.ProducerConfig.BootstrapServers = x.ProducerConfig.BootstrapServers ?? kafkaOptions.BootstrapServers; + x.ConsumerConfig.BootstrapServers = x.ConsumerConfig.BootstrapServers ?? kafkaOptions.BootstrapServers; + //consumer groupId + x.ConsumerConfig.GroupId = x.ConsumerConfig.GroupId ?? kafkaOptions.GroupId; + x.ConsumerConfig.AutoOffsetReset = kafkaOptions.ConsumerConfig.AutoOffsetReset ?? AutoOffsetReset.Latest; + } + + options.RegisterExtension(new ConfluentKafkaOptionsExtension(configure)); + return options; + } + } +} diff --git a/bus/EasyCaching.Bus.ConfluentKafka/DefaultConfluentKafkaBus.cs b/bus/EasyCaching.Bus.ConfluentKafka/DefaultConfluentKafkaBus.cs new file mode 100644 index 00000000..296fd5b2 --- /dev/null +++ b/bus/EasyCaching.Bus.ConfluentKafka/DefaultConfluentKafkaBus.cs @@ -0,0 +1,149 @@ +namespace EasyCaching.Bus.ConfluentKafka +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using Confluent.Kafka; + using EasyCaching.Core; + using EasyCaching.Core.Bus; + using EasyCaching.Core.Serialization; + using Microsoft.Extensions.Logging; + using Microsoft.Extensions.Logging.Abstractions; + using Microsoft.Extensions.Options; + + public class DefaultConfluentKafkaBus : EasyCachingAbstractBus + { + + + /// + /// The kafka Bus options. + /// + private readonly ConfluentKafkaBusOptions _kafkaBusOptions; + + /// + /// The serializer. + /// + private readonly IEasyCachingSerializer _serializer; + + /// + /// kafka producer object + /// + + private readonly IProducer _producer; + + + /// + /// log + /// + + private readonly ILogger _logger = NullLogger.Instance; + + + /// + /// Initializes a new instance of the class. + /// + /// + /// + public DefaultConfluentKafkaBus( + IOptionsMonitor kafkaBusOptions + , IEasyCachingSerializer serializer) + { + this.BusName = "easycachingbus"; + this._kafkaBusOptions = kafkaBusOptions.CurrentValue; + + this._producer = new ProducerBuilder(this._kafkaBusOptions.ProducerConfig).Build(); + + this._serializer = serializer; + } + + /// + /// Publish the specified topic and message. + /// + /// Topic. + /// Message. + public override void BasePublish(string topic, EasyCachingMessage message) + { + var msg = _serializer.Serialize(message); + + _producer.Produce(topic, new Message { Value = msg }); + } + + /// + /// Publishs the async. + /// + /// The async. + /// Topic. + /// Message. + /// Cancellation token. + public override async Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken)) + { + var msg = _serializer.Serialize(message); + + await _producer.ProduceAsync(topic, new Message { Value = msg }); + } + + /// + /// Subscribe the specified topic and action. + /// + /// Topic. + /// Action. + public override void BaseSubscribe(string topic, Action action) + { + Task.Factory.StartNew(() => + { + for (int i = 0; i < this._kafkaBusOptions.ConsumerCount; i++) + { + using (var consumer = new ConsumerBuilder(this._kafkaBusOptions.ConsumerConfig).Build()) + { + consumer.Subscribe(topic); + try + { + while (true) + { + try + { + var cr = consumer.Consume(); + if (cr.IsPartitionEOF + || cr.Message == null + || cr.Message.Value.Length == 0) + { + continue; + } + OnMessage(cr.Message.Value); + } + catch (ConsumeException ex) + { + _logger.LogError(ex, "Consumer {0} error of reason {1}.", topic, ex.Error.Reason); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + _logger.LogError(ex, "Consumer {0} error.", topic); + } + } + } + catch (OperationCanceledException ex) + { + _logger.LogWarning(ex, "Consumer {0} cancel.", topic); + consumer.Close(); + } + } + } + }, TaskCreationOptions.LongRunning); + } + + /// + /// Ons the message. + /// + /// Body. + private void OnMessage(byte[] body) + { + var message = _serializer.Deserialize(body); + + BaseOnMessage(message); + } + } +} diff --git a/bus/EasyCaching.Bus.ConfluentKafka/EasyCaching.Bus.ConfluentKafka.csproj b/bus/EasyCaching.Bus.ConfluentKafka/EasyCaching.Bus.ConfluentKafka.csproj new file mode 100644 index 00000000..52779cf2 --- /dev/null +++ b/bus/EasyCaching.Bus.ConfluentKafka/EasyCaching.Bus.ConfluentKafka.csproj @@ -0,0 +1,42 @@ + + + + + netstandard2.0;net6.0 + ncc;Catcher Wong + ncc;Catcher Wong + $(EasyCachingKafkaBusPackageVersion) + + + A simple caching bus(message bus) based on Kafka. + + Bus,Hybrid,Kafka,Caching,Cache + https://github.com/dotnetcore/EasyCaching + LICENSE + https://github.com/dotnetcore/EasyCaching + https://github.com/dotnetcore/EasyCaching + nuget-icon.png + + $(EasyCachingKafkaBusPackageNotes) + + + + + true + $(NoWarn);1591 + + + + + + + + + + + + + + + + diff --git a/bus/EasyCaching.Bus.Zookeeper/Configurations/EasyCachingOptionsExtensions.cs b/bus/EasyCaching.Bus.Zookeeper/Configurations/EasyCachingOptionsExtensions.cs new file mode 100644 index 00000000..80d7452a --- /dev/null +++ b/bus/EasyCaching.Bus.Zookeeper/Configurations/EasyCachingOptionsExtensions.cs @@ -0,0 +1,64 @@ +namespace Microsoft.Extensions.DependencyInjection +{ + using EasyCaching.Bus.Zookeeper; + using EasyCaching.Core; + using EasyCaching.Core.Configurations; + using Microsoft.Extensions.Configuration; + using System; + + /// + /// EasyCaching options extensions. + /// + public static class EasyCachingOptionsExtensions + { + /// + /// Withs the Zookeeper bus (specify the config via hard code). + /// + /// + /// + /// + public static EasyCachingOptions WithZookeeeperBus( + this EasyCachingOptions options + , Action configure + ) + { + ArgumentCheck.NotNull(configure, nameof(configure)); + options.RegisterExtension(new ZookeeperOptionsExtension(configure)); + return options; + } + + /// + /// Withs the zookeeper bus (read config from configuration file). + /// + /// + /// + /// The section name in the configuration file. + /// + public static EasyCachingOptions WithConfluentKafkaBus( + this EasyCachingOptions options + , IConfiguration configuration + , string sectionName = EasyCachingConstValue.ZookeeperBusSection + ) + { + var dbConfig = configuration.GetSection(sectionName); + var zkOptions = new ZkBusOptions(); + dbConfig.Bind(zkOptions); + + void configure(ZkBusOptions x) + { + x.ConnectionString = zkOptions.ConnectionString; + x.SessionTimeout = zkOptions.SessionTimeout; + x.OperatingTimeout = zkOptions.OperatingTimeout; + x.ConnectionTimeout = zkOptions.ConnectionTimeout; + x.Digest = zkOptions.Digest; + x.BaseRoutePath = zkOptions.BaseRoutePath; + x.ReadOnly = zkOptions.ReadOnly; + x.BaseRoutePath = zkOptions.BaseRoutePath; + x.LogToFile = zkOptions.LogToFile; + } + + options.RegisterExtension(new ZookeeperOptionsExtension(configure)); + return options; + } + } +} diff --git a/bus/EasyCaching.Bus.Zookeeper/Configurations/ZkBusOptions.cs b/bus/EasyCaching.Bus.Zookeeper/Configurations/ZkBusOptions.cs new file mode 100644 index 00000000..5996a947 --- /dev/null +++ b/bus/EasyCaching.Bus.Zookeeper/Configurations/ZkBusOptions.cs @@ -0,0 +1,89 @@ +using System; + +namespace EasyCaching.Bus.Zookeeper +{ + public class ZkBusOptions + { + public ZkBusOptions() + { + this.ConnectionTimeout = 50000;//Milliseconds + this.OperatingTimeout = 10000; + this.SessionTimeout = 50000; + } + + /// + /// create ZooKeeper client + /// + /// + /// + public ZkBusOptions(string connectionString) + { + if (string.IsNullOrEmpty(connectionString)) + throw new ArgumentNullException(nameof(connectionString)); + + ConnectionString = connectionString; + } + + /// + /// create ZooKeeper client + /// + /// + /// + /// + /// + /// + public ZkBusOptions(string connectionString + , int connectionTimeout + , int operatingTimeout + , int sessionTimeout) + { + if (string.IsNullOrEmpty(connectionString)) + throw new ArgumentNullException(nameof(connectionString)); + + ConnectionString = connectionString; + this.ConnectionTimeout = connectionTimeout; + this.SessionTimeout = sessionTimeout; + this.OperatingTimeout = operatingTimeout; + } + + /// + /// connect string + /// + public string ConnectionString { get; set; } + + /// + /// readonly + /// + public bool ReadOnly { get; set; } = false; + + /// + /// point user to access + /// + public string Digest { get; set; } + + /// + /// log to file options + /// + public bool LogToFile { get; set; } = false; + + /// + /// base root path + /// + public string BaseRoutePath { get; set; } = "easyCacheBus"; + + /// + /// wait zooKeeper connect time with Milliseconds + /// + public int ConnectionTimeout { get; set; } + + /// + /// execute zooKeeper handler retry waittime with Milliseconds + /// + public int OperatingTimeout { get; set; } + + /// + /// zookeeper session timeout with Milliseconds + /// + public int SessionTimeout { get; set; } + } +} \ No newline at end of file diff --git a/bus/EasyCaching.Bus.Zookeeper/Configurations/ZookeeperOptionsExtension.cs b/bus/EasyCaching.Bus.Zookeeper/Configurations/ZookeeperOptionsExtension.cs new file mode 100644 index 00000000..e8da908a --- /dev/null +++ b/bus/EasyCaching.Bus.Zookeeper/Configurations/ZookeeperOptionsExtension.cs @@ -0,0 +1,38 @@ +namespace EasyCaching.Bus.Zookeeper +{ + using EasyCaching.Core.Bus; + using EasyCaching.Core.Configurations; + using EasyCaching.Core.Serialization; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.DependencyInjection.Extensions; + using System; + + /// + /// Zookeeper options extension. + /// + internal sealed class ZookeeperOptionsExtension : IEasyCachingOptionsExtension + { + private readonly Action _zkBusOptions; + + public ZookeeperOptionsExtension(Action zkBusOptions) + { + this._zkBusOptions = zkBusOptions; + } + + /// + /// Adds the services. + /// + /// Services. + public void AddServices(IServiceCollection services) + { + services.AddOptions(); + + services.TryAddSingleton(); + + services.AddOptions() + .Configure(_zkBusOptions); + + services.AddSingleton(); + } + } +} \ No newline at end of file diff --git a/bus/EasyCaching.Bus.Zookeeper/DefaultZookeeperBus.cs b/bus/EasyCaching.Bus.Zookeeper/DefaultZookeeperBus.cs new file mode 100644 index 00000000..c507d008 --- /dev/null +++ b/bus/EasyCaching.Bus.Zookeeper/DefaultZookeeperBus.cs @@ -0,0 +1,376 @@ +namespace EasyCaching.Bus.Zookeeper +{ + using EasyCaching.Core; + using EasyCaching.Core.Bus; + using EasyCaching.Core.Serialization; + using Microsoft.Extensions.Options; + using org.apache.zookeeper; + using org.apache.zookeeper.data; + using System; + using System.Collections.Generic; + using System.Linq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + + public class DefaultZookeeperBus : EasyCachingAbstractBus + { + /// + /// The zookeeper Bus options. + /// + private readonly ZkBusOptions _zkBusOptions; + + /// + /// The zookeeper Client + /// + private ZooKeeper _zkClient; + + /// + /// zookeeper data chane delegate event + /// + /// + /// + + public delegate Task NodeDataChangeHandler(WatchedEvent @event); + + /// + /// event + /// + private NodeDataChangeHandler _dataChangeHandler; + + /// + /// lock + /// + private readonly object _zkEventLock = new object(); + + /// + /// The serializer. + /// + private readonly IEasyCachingSerializer _serializer; + + /// + /// Initializes a new instance of the class. + /// + /// + /// + public DefaultZookeeperBus( + IOptionsMonitor zkBusOptions + , IEasyCachingSerializer serializer) + { + this.BusName = "easycachingbus"; + this._zkBusOptions = zkBusOptions.CurrentValue; + this._zkClient = CreateClient(zkBusOptions.CurrentValue, new ZkNodeDataWatch(this)); + + this._serializer = serializer; + } + + /// + /// Publish the specified topic and message. + /// + /// Topic. + /// Message. + public override void BasePublish(string topic, EasyCachingMessage message) + { + var msg = _serializer.Serialize(message); + var path = $"{topic}"; + Task.Run(async () => + { + if (!await PathExistsAsync(path, true)) + { + await CreateRecursiveAsync(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + await SetDataAsync(path, msg); + }).ConfigureAwait(false).GetAwaiter().GetResult(); + } + + /// + /// Publishs the async. + /// + /// The async. + /// Topic. + /// Message. + /// Cancellation token. + public override async Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken)) + { + var msg = _serializer.Serialize(message); + var path = $"{topic}"; + + if (!await PathExistsAsync(path, true)) + { + await CreateRecursiveAsync(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + await SetDataAsync(path, msg); + } + + /// + /// Subscribe the specified topic and action. + /// + /// Topic. + /// Action. + public override void BaseSubscribe(string topic, Action action) + { + var path = $"{topic}"; + Task.Factory.StartNew(async () => + { + await SubscribeDataChangeAsync(path, SubscribeDataChange); + }, TaskCreationOptions.LongRunning); + } + + /// + /// Ons the message. + /// + /// Body. + private void OnMessage(byte[] body) + { + var message = _serializer.Deserialize(body); + BaseOnMessage(message); + } + + /// + /// create zk client + /// + /// + /// + /// + /// + private ZooKeeper CreateClient(ZkBusOptions options, Watcher watcher) + { + ZooKeeper.LogToFile = options.LogToFile; + var zk = new ZooKeeper(options.ConnectionString, options.SessionTimeout, watcher); + if (!string.IsNullOrEmpty(options.Digest)) + { + zk.addAuthInfo("digest", Encoding.UTF8.GetBytes(options.Digest)); + } + var operationStartTime = DateTime.Now; + while (true) + { + if (zk.getState() == ZooKeeper.States.CONNECTING) + { + Thread.Sleep(100); + } + else if (zk.getState() == ZooKeeper.States.CONNECTED + || zk.getState() == ZooKeeper.States.CONNECTEDREADONLY) + { + return zk; + } + if (DateTime.Now - operationStartTime > TimeSpan.FromMilliseconds(options.OperatingTimeout)) + { + throw new TimeoutException( + $"connect cannot be retried because of retry timeout ({options.OperatingTimeout}Milliseconds)"); + } + } + } + + /// + /// subscribe data change + /// + /// + /// + private async Task SubscribeDataChange(WatchedEvent @event) + { + var state = @event.getState(); + if (state == Watcher.Event.KeeperState.Expired) + { + await ReZkConnect(); + } + + var eventType = @event.get_Type(); + byte[] nodeData = await GetDataAsync(@event.getPath()); + + switch (eventType) + { + case Watcher.Event.EventType.NodeCreated: + break; + + case Watcher.Event.EventType.NodeDeleted: + case Watcher.Event.EventType.NodeDataChanged: + if (!nodeData.Any()) + { + return; + } + + //hander business logical + OnMessage(nodeData); + break; + } + await Task.CompletedTask; + } + + /// + /// reconnnect zk + /// + /// + private async Task ReZkConnect() + { + if (!Monitor.TryEnter(_zkEventLock, _zkBusOptions.ConnectionTimeout)) + return; + try + { + if (_zkClient != null) + { + try + { + await _zkClient.closeAsync(); + } + catch + { + // ignored + } + } + + _zkClient = CreateClient(_zkBusOptions, new ZkNodeDataWatch(this)); + } + finally + { + Monitor.Exit(_zkEventLock); + } + } + + /// + /// subscribe data change + /// + /// + /// + /// + private async Task SubscribeDataChangeAsync(string path, NodeDataChangeHandler listener) + { + _dataChangeHandler += listener; + await PathExistsAsync(path, true); + } + + /// + /// pathExists + /// + /// + /// + /// + private async Task PathExistsAsync(string path, bool watch = false) + { + path = GetZooKeeperPath(path); + var state = await _zkClient.existsAsync(path, watch); + return state != null; + } + + /// + /// set node data + /// + /// + /// + /// + /// node stat + public async Task SetDataAsync(string path, byte[] data, int version = -1) + { + path = GetZooKeeperPath(path); + var stat = await _zkClient.setDataAsync(path, data, version); + return stat; + } + + /// + /// get data + /// + /// + /// + /// + public async Task GetDataAsync(string path, bool pathCv = false) + { + if (pathCv) + { + path = GetZooKeeperPath(path); + } + var data = await _zkClient.getDataAsync(path); + return data?.Data; + } + + /// + /// recurive create + /// + /// + /// + /// + /// + /// + private async Task CreateRecursiveAsync(string path, byte[] data, List acls, CreateMode createMode) + { + path = GetZooKeeperPath(path); + var paths = path.Trim('/').Split('/'); + var cur = ""; + foreach (var item in paths) + { + if (string.IsNullOrEmpty(item)) + { + continue; + } + cur += $"/{item}"; + var existStat = await _zkClient.existsAsync(cur, null); + if (existStat != null) + { + continue; + } + + if (cur.Equals(path)) + { + await _zkClient.createAsync(cur, data, acls, createMode); + } + else + { + await _zkClient.createAsync(cur, null, acls, createMode); + } + } + return await Task.FromResult(true); + } + + /// + /// + /// + /// + /// + private string GetZooKeeperPath(string path) + { + var basePath = _zkBusOptions.BaseRoutePath ?? "/"; + + if (!basePath.StartsWith("/")) + basePath = basePath.Insert(0, "/"); + + basePath = basePath.TrimEnd('/'); + + if (!path.StartsWith("/")) + path = path.Insert(0, "/"); + + path = $"{basePath}{path.TrimEnd('/')}"; + return string.IsNullOrEmpty(path) ? "/" : path; + } + + /// + /// watch zkNode data Change + /// + private class ZkNodeDataWatch : Watcher + { + private readonly DefaultZookeeperBus _defaultZookeeperBus; + + public ZkNodeDataWatch(DefaultZookeeperBus defaultZookeeperBus) + { + _defaultZookeeperBus = defaultZookeeperBus; + } + + public override async Task process(WatchedEvent watchedEvent) + { + var path = watchedEvent.getPath(); + if (path != null) + { + var eventType = watchedEvent.get_Type(); + var dataChanged = new[] + { + Watcher.Event.EventType.NodeCreated, + Watcher.Event.EventType.NodeDataChanged, + Watcher.Event.EventType.NodeDeleted + }.Contains(eventType); + + if (dataChanged) + { + await _defaultZookeeperBus._dataChangeHandler(watchedEvent); + } + } + } + } + } +} \ No newline at end of file diff --git a/bus/EasyCaching.Bus.Zookeeper/EasyCaching.Bus.Zookeeper.csproj b/bus/EasyCaching.Bus.Zookeeper/EasyCaching.Bus.Zookeeper.csproj new file mode 100644 index 00000000..5d754db8 --- /dev/null +++ b/bus/EasyCaching.Bus.Zookeeper/EasyCaching.Bus.Zookeeper.csproj @@ -0,0 +1,43 @@ + + + + + + netstandard2.0;net6.0 + ncc;Catcher Wong + ncc;Catcher Wong + $(EasyCachingZookeeperBusPackageVersion) + + + A simple caching bus(message bus) based on Zookeeper. + + Bus,Hybrid,Zookeeper,Caching,Cache + https://github.com/dotnetcore/EasyCaching + LICENSE + https://github.com/dotnetcore/EasyCaching + https://github.com/dotnetcore/EasyCaching + nuget-icon.png + + $(EasyCachingZookeeperBusPackageNotes) + + + + + true + $(NoWarn);1591 + + + + + + + + + + + + + + + + diff --git a/sample/EasyCaching.Demo.Providers/Controllers/ValuesBusController.cs b/sample/EasyCaching.Demo.Providers/Controllers/ValuesBusController.cs new file mode 100644 index 00000000..abb76ab8 --- /dev/null +++ b/sample/EasyCaching.Demo.Providers/Controllers/ValuesBusController.cs @@ -0,0 +1,46 @@ +namespace EasyCaching.Demo.Providers.Controllers +{ + using EasyCaching.Core; + using Microsoft.AspNetCore.Mvc; + using System; + using System.Threading.Tasks; + + [Route("api/[controller]")] + public class ValuesBusController : Controller + { + //2. Hybird Cache + private readonly IHybridCachingProvider _provider; + private readonly IEasyCachingProviderFactory _factory; + + public ValuesBusController(IHybridCachingProvider provider, IEasyCachingProviderFactory factory) + { + this._provider = provider; + _factory = factory; + } + + // GET api/values + [HttpGet] + [Route("get2")] + public async Task Get2() + { + var rd = new Random(1000); + for (int i = 0; i < 5; i++) + { + var val = rd.Next().ToString(); + await _provider.SetAsync($"demo{i}", val, TimeSpan.FromSeconds(5000)); + var provider = _factory.GetCachingProvider("cus"); + var v1 = provider.Get($"demow{i}"); + //Console.WriteLine($"{i}-->{v1}"); + + await _provider.SetAsync($"demow{i}", $"changeda-{val}", TimeSpan.FromSeconds(5000)); + + //var v2 = provider.Get($"demo{i}"); + //Console.WriteLine($"after--{i}-->{v2}"); + //Console.WriteLine("------------------"); + } + return $"hybrid"; + } + + + } +} diff --git a/sample/EasyCaching.Demo.Providers/EasyCaching.Demo.Providers.csproj b/sample/EasyCaching.Demo.Providers/EasyCaching.Demo.Providers.csproj index f9afa091..0fb6da6f 100644 --- a/sample/EasyCaching.Demo.Providers/EasyCaching.Demo.Providers.csproj +++ b/sample/EasyCaching.Demo.Providers/EasyCaching.Demo.Providers.csproj @@ -5,6 +5,8 @@ + + diff --git a/sample/EasyCaching.Demo.Providers/Program.cs b/sample/EasyCaching.Demo.Providers/Program.cs index 81dd13f5..dbf64033 100644 --- a/sample/EasyCaching.Demo.Providers/Program.cs +++ b/sample/EasyCaching.Demo.Providers/Program.cs @@ -1,7 +1,9 @@ namespace EasyCaching.Demo.Providers { using Microsoft.AspNetCore.Hosting; + using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; + using System.IO; public class Program { @@ -12,6 +14,15 @@ public static void Main(string[] args) public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) + .UseContentRoot(Directory.GetCurrentDirectory()) + .ConfigureAppConfiguration((hosting, config) => + { + config.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true) + .AddJsonFile($"appsettings.{hosting.HostingEnvironment.EnvironmentName}.json", optional: true, + true); + + config.AddEnvironmentVariables(); + }) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup(); diff --git a/sample/EasyCaching.Demo.Providers/Startup.cs b/sample/EasyCaching.Demo.Providers/Startup.cs index 84160cc2..9f9e30f5 100644 --- a/sample/EasyCaching.Demo.Providers/Startup.cs +++ b/sample/EasyCaching.Demo.Providers/Startup.cs @@ -1,87 +1,87 @@ -namespace EasyCaching.Demo.Providers +namespace EasyCaching.Demo.Providers { using EasyCaching.Core.Configurations; using EasyCaching.SQLite; - using Microsoft.AspNetCore.Builder; - using Microsoft.AspNetCore.Hosting; - using Microsoft.Extensions.Configuration; - using Microsoft.Extensions.DependencyInjection; - using Microsoft.Extensions.Hosting; - using Microsoft.Extensions.Logging; - - public class Startup - { - public Startup(IConfiguration configuration) - { - Configuration = configuration; - } - - public IConfiguration Configuration { get; } - - public void ConfigureServices(IServiceCollection services) - { - services.AddControllers(); - - //new configuration + using Microsoft.AspNetCore.Builder; + using Microsoft.AspNetCore.Hosting; + using Microsoft.Extensions.Configuration; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Hosting; + using Microsoft.Extensions.Logging; + + public class Startup + { + public Startup(IConfiguration configuration) + { + Configuration = configuration; + } + + public IConfiguration Configuration { get; } + + public void ConfigureServices(IServiceCollection services) + { + services.AddControllers(); + + //new configuration services.AddEasyCaching(option => - { - //use memory cache - option.UseInMemory("default"); - - //use memory cache - option.UseInMemory("cus"); - - //use redis cache + { + //use memory cache + option.UseInMemory("default"); + + //use memory cache + option.UseInMemory("cus"); + + //use redis cache option.UseRedis(config => - { - config.DBConfig.Endpoints.Add(new ServerEndPoint("127.0.0.1", 6379)); - config.DBConfig.SyncTimeout = 10000; - config.DBConfig.AsyncTimeout = 10000; - config.SerializerName = "mymsgpack"; - }, "redis1") - .WithMessagePack("mymsgpack")//with messagepack serialization - ; - - //use redis cache + { + config.DBConfig.Endpoints.Add(new ServerEndPoint("127.0.0.1", 6379)); + config.DBConfig.SyncTimeout = 10000; + config.DBConfig.AsyncTimeout = 10000; + config.SerializerName = "mymsgpack"; + }, "redis1") + .WithMessagePack("mymsgpack")//with messagepack serialization + ; + + //use redis cache option.UseRedis(config => - { - config.DBConfig.Endpoints.Add(new ServerEndPoint("127.0.0.1", 6380)); - }, "redis2"); - - //use sqlite cache - option.UseSQLite(config => - { - config.DBConfig = new SQLiteDBOptions { FileName = "my.db" }; - }); - - //use memcached cached + { + config.DBConfig.Endpoints.Add(new ServerEndPoint("127.0.0.1", 6380)); + }, "redis2"); + + //use sqlite cache + option.UseSQLite(config => + { + config.DBConfig = new SQLiteDBOptions { FileName = "my.db" }; + }); + + //use memcached cached option.UseMemcached(config => - { - config.DBConfig.AddServer("127.0.0.1", 11211); - }); - - option.UseMemcached(Configuration); + { + config.DBConfig.AddServer("127.0.0.1", 11211); + }); + + option.UseMemcached(Configuration); }); - } - - public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerFactory loggerFactory) - { - if (env.IsDevelopment()) - { - app.UseDeveloperExceptionPage(); - } - - //loggerFactory.AddConsole(Configuration.GetSection("Logging")); - - // Important step for using Memcached Cache or SQLite Cache - //app.UseEasyCaching(); - - app.UseRouting(); - - app.UseEndpoints(endpoints => - { - endpoints.MapControllers(); - }); - } - } + } + + public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerFactory loggerFactory) + { + if (env.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + } + + //loggerFactory.AddConsole(Configuration.GetSection("Logging")); + + // Important step for using Memcached Cache or SQLite Cache + //app.UseEasyCaching(); + + app.UseRouting(); + + app.UseEndpoints(endpoints => + { + endpoints.MapControllers(); + }); + } + } } diff --git a/sample/EasyCaching.Demo.Providers/appsettings.json b/sample/EasyCaching.Demo.Providers/appsettings.json index 8f4acb34..85b08a94 100644 --- a/sample/EasyCaching.Demo.Providers/appsettings.json +++ b/sample/EasyCaching.Demo.Providers/appsettings.json @@ -12,59 +12,70 @@ } } }, - "easycaching": { - "inmemory": { - "CachingProviderType": 1, - "MaxRdSecond": 120, - "Order": 2, - }, - "sqlite": { - "CachingProviderType": 3, - "MaxRdSecond": 120, - "Order": 2, - "dbconfig": { - "FileName": "my.db" - } - }, - "redis": { - "CachingProviderType": 2, - "MaxRdSecond": 120, - "Order": 2, - "dbconfig": { - "Password": null, - "IsSsl": false, - "SslHost": null, - "ConnectionTimeout": 5000, - "AllowAdmin": true, - "Endpoints": [ - { - "Host": "localhost", - "Port": 6739 - } - ], - "Database": 0 - } - }, - "memcached": { - "CachingProviderType": 4, - "MaxRdSecond": 120, - "Order": 2, - "dbconfig": { - "Servers": [ - { - "Address": "127.0.0.1", - "Port": 11211 - } - ], - "socketPool": { - "minPoolSize": "5", - "maxPoolSize": "25", - "connectionTimeout": "00:00:15", - "receiveTimeout": "00:00:15", - "deadTimeout": "00:00:15", - "queueTimeout": "00:00:00.150" - } - } + "easycaching": { + "inmemory": { + "CachingProviderType": 1, + "MaxRdSecond": 120, + "Order": 2 + }, + "sqlite": { + "CachingProviderType": 3, + "MaxRdSecond": 120, + "Order": 2, + "dbconfig": { + "FileName": "my.db" + } + }, + "redis": { + "CachingProviderType": 2, + "MaxRdSecond": 120, + "Order": 2, + "dbconfig": { + "Password": null, + "IsSsl": false, + "SslHost": null, + "ConnectionTimeout": 5000, + "AllowAdmin": true, + "Endpoints": [ + { + "Host": "127.0.0.1", + "Port": 6739 + } + ], + "Database": 0 + } + }, + "memcached": { + "CachingProviderType": 4, + "MaxRdSecond": 120, + "Order": 2, + "dbconfig": { + "Servers": [ + { + "Address": "127.0.0.1", + "Port": 11211 + } + ], + "socketPool": { + "minPoolSize": "5", + "maxPoolSize": "25", + "connectionTimeout": "00:00:15", + "receiveTimeout": "00:00:15", + "deadTimeout": "00:00:15", + "queueTimeout": "00:00:00.150" } + } + }, + "kafkabus": { + // "BootstrapServers": "127.0.0.1:9093", + "ProducerConfig": { + "BootstrapServers": "127.0.0.1:9093" + }, + "ConsumerConfig": { + "BootstrapServers": "127.0.0.1:9093", + "GroupId": "MyGroupId" + }, + "ConsumerCount":2 } + } } diff --git a/src/EasyCaching.Core/Internal/EasyCachingConstValue.cs b/src/EasyCaching.Core/Internal/EasyCachingConstValue.cs index 04fb045c..4e471456 100644 --- a/src/EasyCaching.Core/Internal/EasyCachingConstValue.cs +++ b/src/EasyCaching.Core/Internal/EasyCachingConstValue.cs @@ -53,7 +53,18 @@ public class EasyCachingConstValue /// /// The rabbitMQ Bus section. /// - public const string RabbitMQBusSection = "easycaching:rabbitmqbus"; + public const string RabbitMQBusSection = "easycaching:rabbitmqbus"; + + /// + /// The kafka bus section. + /// + public const string KafkaBusSection = "easycaching:kafkabus"; + + /// + /// The zookeeper bus section. + /// + public const string ZookeeperBusSection = "easycaching:zookeeperbus"; + /// /// The default name of the in-memory.