Skip to content

Commit

Permalink
whether can support zookeeper bus and kafka bus #404 (#405)
Browse files Browse the repository at this point in the history
* feat:add EasyCaching.Bus.ConfluentKafka module
1.use kafka to notify cache to change
2.testing seeing sample EasyCaching.Demo.Providers ValuesBusController

* feat:
1.Add ZooKeeperBus Module,
2.kafakBus options write AutoOffsetReset value

* feat:zookeeperBus

* feat:
1.zk bus

* fix:
1.add zk config command,zk init time compare
2.add testBus

* feat:
1.Eliminate careless writing and code formatting

* fix:
1.edit build/releasenotes.props and build/version.props add zookeeper/kafka bus module notes and version
2.EasyCaching.Demo.Providers/Startup.cs to original file data
  • Loading branch information
bingtianyiyan authored Oct 18, 2022
1 parent c32dae1 commit 14bbda4
Show file tree
Hide file tree
Showing 19 changed files with 1,203 additions and 134 deletions.
18 changes: 16 additions & 2 deletions EasyCaching.sln
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29324.140
# Visual Studio Version 17
VisualStudioVersion = 17.2.32616.157
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{A0F5CC7E-155F-4726-8DEB-E966950B3FE9}"
EndProject
Expand Down Expand Up @@ -70,6 +70,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.LiteDB", "src\E
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.Serialization.SystemTextJson", "serialization\EasyCaching.Serialization.SystemTextJson\EasyCaching.Serialization.SystemTextJson.csproj", "{4FCF16BF-5E21-4B74-AB45-3C121ADF1485}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.Bus.ConfluentKafka", "bus\EasyCaching.Bus.ConfluentKafka\EasyCaching.Bus.ConfluentKafka.csproj", "{F7FBADEB-D766-4595-949A-07104B52692C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyCaching.Bus.Zookeeper", "bus\EasyCaching.Bus.Zookeeper\EasyCaching.Bus.Zookeeper.csproj", "{5E488583-391E-4E15-83C1-7301B4FE79AE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -184,6 +188,14 @@ Global
{4FCF16BF-5E21-4B74-AB45-3C121ADF1485}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4FCF16BF-5E21-4B74-AB45-3C121ADF1485}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4FCF16BF-5E21-4B74-AB45-3C121ADF1485}.Release|Any CPU.Build.0 = Release|Any CPU
{F7FBADEB-D766-4595-949A-07104B52692C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F7FBADEB-D766-4595-949A-07104B52692C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F7FBADEB-D766-4595-949A-07104B52692C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F7FBADEB-D766-4595-949A-07104B52692C}.Release|Any CPU.Build.0 = Release|Any CPU
{5E488583-391E-4E15-83C1-7301B4FE79AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5E488583-391E-4E15-83C1-7301B4FE79AE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5E488583-391E-4E15-83C1-7301B4FE79AE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5E488583-391E-4E15-83C1-7301B4FE79AE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -216,6 +228,8 @@ Global
{711603E1-8363-4F8D-9AA9-8C03EC8BD35F} = {B4241D34-A973-4A13-BD89-9BAE3F2BDDF6}
{BA850294-3103-4540-8A27-FC768E1DC8FC} = {A0F5CC7E-155F-4726-8DEB-E966950B3FE9}
{4FCF16BF-5E21-4B74-AB45-3C121ADF1485} = {15070C49-A507-4844-BCFE-D319CFBC9A63}
{F7FBADEB-D766-4595-949A-07104B52692C} = {B337509B-75F9-4851-821F-9BBE87C4E4BC}
{5E488583-391E-4E15-83C1-7301B4FE79AE} = {B337509B-75F9-4851-821F-9BBE87C4E4BC}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {63A57886-054B-476C-AAE1-8D7C8917682E}
Expand Down
6 changes: 6 additions & 0 deletions build/releasenotes.props
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<EasyCachingRabbitBusPackageNotes>
1. Upgrading dependencies.
</EasyCachingRabbitBusPackageNotes>
<EasyCachingKafkaBusPackageNotes>
1. Add EasyCachingKafkaBus.
</EasyCachingKafkaBusPackageNotes>
<EasyCachingZookeeperBusPackageNotes>
1. Add EasyCachingZookeeperBus.
</EasyCachingZookeeperBusPackageNotes>
<EasyCachingDiskPackageNotes>
1. Add CancellationToken for async methods.
</EasyCachingDiskPackageNotes>
Expand Down
2 changes: 2 additions & 0 deletions build/version.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
<EasyCachingRedisBusPackageVersion>1.6.1</EasyCachingRedisBusPackageVersion>
<EasyCachingCSRedisBusPackageVersion>1.6.1</EasyCachingCSRedisBusPackageVersion>
<EasyCachingRabbitBusPackageVersion>1.6.1</EasyCachingRabbitBusPackageVersion>
<EasyCachingKafkaBusPackageVersion>1.6.1</EasyCachingKafkaBusPackageVersion>
<EasyCachingZookeeperBusPackageVersion>1.6.1</EasyCachingZookeeperBusPackageVersion>
<EasyCachingDiskPackageVersion>1.6.1</EasyCachingDiskPackageVersion>
<EasyCachingMsExtPackageVersion>1.6.1</EasyCachingMsExtPackageVersion>
<EasyCachingLiteDBPackageVersion>1.6.1</EasyCachingLiteDBPackageVersion>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Confluent.Kafka;

namespace EasyCaching.Bus.ConfluentKafka
{
/// <summary>
/// kafka bus options
/// </summary>
public class ConfluentKafkaBusOptions
{
/// <summary>
/// kafka address(BootstrapServers must)
/// </summary>
public string BootstrapServers { get; set; }


/// <summary>
/// kafka bus producer options.
/// </summary>
public ProducerConfig ProducerConfig { get; set; }

/// <summary>
/// kafka bus consumer options.(if GroupId value below is empty,then ConsumerConfig.GroupId must )
/// </summary>
public ConsumerConfig ConsumerConfig { get; set; }

/// <summary>
/// kafka bus consumer options with consumer groupId
/// (if ConsumerConfig below has give GroupId value , this options can ignore)
/// import:if application is cluster,you should set this different value in application,this will make consumer can consumerdata
/// </summary>
public string GroupId { get; set; }

/// <summary>
/// kafka bus consumer consume count
/// </summary>
public int ConsumerCount { get; set; } = 1;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
namespace EasyCaching.Bus.ConfluentKafka
{
using System;
using EasyCaching.Core.Bus;
using EasyCaching.Core.Configurations;
using EasyCaching.Core.Serialization;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

/// <summary>
/// Kafka options extension.
/// </summary>
internal sealed class ConfluentKafkaOptionsExtension : IEasyCachingOptionsExtension
{

private readonly Action<ConfluentKafkaBusOptions> _confluentKafkaBusOptions;

public ConfluentKafkaOptionsExtension(Action<ConfluentKafkaBusOptions> confluentKafkaBusOptions)
{
this._confluentKafkaBusOptions = confluentKafkaBusOptions;
}

/// <summary>
/// Adds the services.
/// </summary>
/// <param name="services">Services.</param>
public void AddServices(IServiceCollection services)
{
services.AddOptions();

services.TryAddSingleton<IEasyCachingSerializer, DefaultBinaryFormatterSerializer>();

services.AddOptions<ConfluentKafkaBusOptions>()
.Configure(_confluentKafkaBusOptions);


//var options = services.BuildServiceProvider()
// .GetRequiredService<IOptions<ConfluentKafkaBusOptions>>()
// .Value;

services.AddSingleton<IEasyCachingBus, DefaultConfluentKafkaBus>();

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
namespace Microsoft.Extensions.DependencyInjection
{
using Confluent.Kafka;
using EasyCaching.Bus.ConfluentKafka;
using EasyCaching.Core;
using EasyCaching.Core.Configurations;
using Microsoft.Extensions.Configuration;
using System;

/// <summary>
/// EasyCaching options extensions.
/// </summary>
public static class EasyCachingOptionsExtensions
{
/// <summary>
/// Withs the ConfluentKafka bus (specify the config via hard code).
/// </summary>
/// <param name="options"></param>
/// <param name="configure"></param>
/// <returns></returns>
public static EasyCachingOptions WithConfluentKafkaBus(
this EasyCachingOptions options
, Action<ConfluentKafkaBusOptions> configure
)
{
ArgumentCheck.NotNull(configure, nameof(configure));
//option convert
ConfluentKafkaBusOptions kafkaOptions = new ConfluentKafkaBusOptions();
configure.Invoke(kafkaOptions);
void kafkaBusConfigure(ConfluentKafkaBusOptions x)
{
x.BootstrapServers = kafkaOptions.BootstrapServers;
x.ProducerConfig = kafkaOptions.ProducerConfig ?? new ProducerConfig();
x.ConsumerConfig = kafkaOptions.ConsumerConfig ?? new ConsumerConfig();
//address
x.ProducerConfig.BootstrapServers = x.ProducerConfig.BootstrapServers ?? kafkaOptions.BootstrapServers;
x.ConsumerConfig.BootstrapServers = x.ConsumerConfig.BootstrapServers ?? kafkaOptions.BootstrapServers;
//consumer groupId
x.ConsumerConfig.GroupId = x.ConsumerConfig.GroupId ?? kafkaOptions.GroupId;
x.ConsumerConfig.AutoOffsetReset = kafkaOptions.ConsumerConfig.AutoOffsetReset ?? AutoOffsetReset.Latest;
}

options.RegisterExtension(new ConfluentKafkaOptionsExtension(kafkaBusConfigure));
return options;
}

/// <summary>
/// Withs the ConfluentKafka bus (read config from configuration file).
/// </summary>
/// <param name="options"></param>
/// <param name="configuration"></param>
/// <param name="sectionName">The section name in the configuration file.</param>
/// <returns></returns>
public static EasyCachingOptions WithConfluentKafkaBus(
this EasyCachingOptions options
, IConfiguration configuration
, string sectionName = EasyCachingConstValue.KafkaBusSection
)
{
var dbConfig = configuration.GetSection(sectionName);
var kafkaOptions = new ConfluentKafkaBusOptions();
dbConfig.Bind(kafkaOptions);

void configure(ConfluentKafkaBusOptions x)
{
x.BootstrapServers = kafkaOptions.BootstrapServers;
x.ProducerConfig = kafkaOptions.ProducerConfig ?? new ProducerConfig();
x.ConsumerConfig = kafkaOptions.ConsumerConfig ?? new ConsumerConfig();
//address
x.ProducerConfig.BootstrapServers = x.ProducerConfig.BootstrapServers ?? kafkaOptions.BootstrapServers;
x.ConsumerConfig.BootstrapServers = x.ConsumerConfig.BootstrapServers ?? kafkaOptions.BootstrapServers;
//consumer groupId
x.ConsumerConfig.GroupId = x.ConsumerConfig.GroupId ?? kafkaOptions.GroupId;
x.ConsumerConfig.AutoOffsetReset = kafkaOptions.ConsumerConfig.AutoOffsetReset ?? AutoOffsetReset.Latest;
}

options.RegisterExtension(new ConfluentKafkaOptionsExtension(configure));
return options;
}
}
}
149 changes: 149 additions & 0 deletions bus/EasyCaching.Bus.ConfluentKafka/DefaultConfluentKafkaBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
namespace EasyCaching.Bus.ConfluentKafka
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using EasyCaching.Core;
using EasyCaching.Core.Bus;
using EasyCaching.Core.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;

public class DefaultConfluentKafkaBus : EasyCachingAbstractBus
{


/// <summary>
/// The kafka Bus options.
/// </summary>
private readonly ConfluentKafkaBusOptions _kafkaBusOptions;

/// <summary>
/// The serializer.
/// </summary>
private readonly IEasyCachingSerializer _serializer;

/// <summary>
/// kafka producer object
/// </summary>

private readonly IProducer<Null, byte[]> _producer;


/// <summary>
/// log
/// </summary>

private readonly ILogger _logger = NullLogger<DefaultConfluentKafkaBus>.Instance;


/// <summary>
/// Initializes a new instance of the <see cref="T:EasyCaching.Bus.ConfluentKafka.DefaultConfluentKafkaBus"/> class.
/// </summary>
/// <param name="kafkaBusOptions"></param>
/// <param name="serializer"></param>
public DefaultConfluentKafkaBus(
IOptionsMonitor<ConfluentKafkaBusOptions> kafkaBusOptions
, IEasyCachingSerializer serializer)
{
this.BusName = "easycachingbus";
this._kafkaBusOptions = kafkaBusOptions.CurrentValue;

this._producer = new ProducerBuilder<Null, byte[]>(this._kafkaBusOptions.ProducerConfig).Build();

this._serializer = serializer;
}

/// <summary>
/// Publish the specified topic and message.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="message">Message.</param>
public override void BasePublish(string topic, EasyCachingMessage message)
{
var msg = _serializer.Serialize(message);

_producer.Produce(topic, new Message<Null, byte[]> { Value = msg });
}

/// <summary>
/// Publishs the async.
/// </summary>
/// <returns>The async.</returns>
/// <param name="topic">Topic.</param>
/// <param name="message">Message.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override async Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken))
{
var msg = _serializer.Serialize(message);

await _producer.ProduceAsync(topic, new Message<Null, byte[]> { Value = msg });
}

/// <summary>
/// Subscribe the specified topic and action.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
public override void BaseSubscribe(string topic, Action<EasyCachingMessage> action)
{
Task.Factory.StartNew(() =>
{
for (int i = 0; i < this._kafkaBusOptions.ConsumerCount; i++)
{
using (var consumer = new ConsumerBuilder<Null, byte[]>(this._kafkaBusOptions.ConsumerConfig).Build())
{
consumer.Subscribe(topic);
try
{
while (true)
{
try
{
var cr = consumer.Consume();
if (cr.IsPartitionEOF
|| cr.Message == null
|| cr.Message.Value.Length == 0)
{
continue;
}
OnMessage(cr.Message.Value);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "Consumer {0} error of reason {1}.", topic, ex.Error.Reason);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Consumer {0} error.", topic);
}
}
}
catch (OperationCanceledException ex)
{
_logger.LogWarning(ex, "Consumer {0} cancel.", topic);
consumer.Close();
}
}
}
}, TaskCreationOptions.LongRunning);
}

/// <summary>
/// Ons the message.
/// </summary>
/// <param name="body">Body.</param>
private void OnMessage(byte[] body)
{
var message = _serializer.Deserialize<EasyCachingMessage>(body);

BaseOnMessage(message);
}
}
}
Loading

0 comments on commit 14bbda4

Please sign in to comment.