Skip to content

Commit

Permalink
Merge pull request #209 from alfa-laboratory/feature/kafka
Browse files Browse the repository at this point in the history
Feature/kafka
  • Loading branch information
egorsh0 authored Dec 13, 2021
2 parents ffc36d0 + 861fae5 commit 5cb899e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
5 changes: 4 additions & 1 deletion examples/Molder.Kafka.Example/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
"AutoOffsetReset": "Earliest",
"AutoCommitIntervalMs": 5000,
"SessionTimeoutMs": 6000,
"EnableAutoCommit": true
"EnableAutoCommit": true,
"SaslUsername": "username",
"SaslPassword": "password",
"SaslMechanism": "OAuthBearer"
},
"Topic": "test-topic",
"Name": "test"
Expand Down
2 changes: 1 addition & 1 deletion src/Molder.Kafka/Molder.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<IsPackable>true</IsPackable>
<Description>Library for work with Kafka</Description>
<LangVersion>9</LangVersion>
<PackageVersion>1.0.0</PackageVersion>
<PackageVersion>1.0.1</PackageVersion>
<AssemblyVersion>$(PackageVersion)</AssemblyVersion>
<Nullable>disable</Nullable>
</PropertyGroup>
Expand Down
42 changes: 40 additions & 2 deletions src/Molder.Kafka/Steps/KafkaSteps.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using FluentAssertions;
using Confluent.Kafka;
using FluentAssertions;
using Molder.Controllers;
using Molder.Extensions;
using Molder.Kafka.Models;
using TechTalk.SpecFlow;
using TechTalk.SpecFlow.Assist;

namespace Molder.Kafka.Steps
{
Expand All @@ -16,7 +19,42 @@ public KafkaSteps(VariableController variableController)
{
this.variableController = variableController;
}


#region Transformations
/// <summary>
/// Трансформация параметров подключения к Kafka.
/// </summary>
/// <param name="table">Параметры подключения.</param>
/// <returns>Параметры подключения к Kafka.</returns>
[StepArgumentTransformation]
public ConsumerConfig GetKafkaParametersFromTable(Table table)
{
return table.ReplaceWith(variableController).CreateInstance<ConsumerConfig>();
}

#endregion
#region Connections
/// <summary>
/// Подключение к Kafka.
/// </summary>
/// <param name="name">Название подключения.</param>
/// <param name="topic">Название топика.</param>
/// <param name="consumerConfig">Параметры подключения.</param>
[Given(@"я создаю параметры для подключения к kafka c именем ""(.+)"" и топиком ""(.+)"":")]
public void ConnectToKafka_Kafka(string name, string topic, ConsumerConfig consumerConfig)
{
KafkaSettings.Settings.Should().NotContainKey(name,
$"параметров подлючения к kafka с именем \"{name}\" не существует");

KafkaSettings.Settings.Add(name, new Settings()
{
Config = consumerConfig,
Name = name,
Topic = topic
});
}
#endregion

[StepDefinition(@"запустить обработчик очереди kafka c именем \""(.+)\""")]
public void Run(string name)
{
Expand Down

0 comments on commit 5cb899e

Please sign in to comment.