Skip to content

Commit

Permalink
Fix Codacy issues
Browse files Browse the repository at this point in the history
  • Loading branch information
golanbz authored and brmagadutra committed Feb 4, 2025
1 parent ad77f89 commit 0683df5
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public void ConfigurationBuild_CallBuild_WithSticky_EnableAutoCommit_True()
{
// Arrange
var consumerConfigurationBuilder = _fixture.Create<ConsumerConfigurationBuilder>();
consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig()
{
consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig
{
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
GroupId = "Test",
}).WithAutoCommitIntervalMs(500)
Expand All @@ -47,7 +47,7 @@ public void ConfigurationBuild_CallBuild_WithSRoundRobin_EnableAutoCommit_False(
{
// Arrange
var consumerConfigurationBuilder = new ConsumerConfigurationBuilder(Mock.Of<IDependencyConfigurator>());
consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig()
consumerConfigurationBuilder.WithConsumerConfig(new ConsumerConfig
{
PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin,
GroupId = "Test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void Setup()
(Action<IDependencyResolver, Confluent.Kafka.IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartitionOffset>> value) =>
_onPartitionRevokedHandler = value);

var configurationMock= new Mock<IConsumerConfiguration>();
var configurationMock = new Mock<IConsumerConfiguration>();

configurationMock
.Setup(x => x.GetKafkaConfig())
Expand All @@ -69,11 +69,11 @@ public void Setup()
_consumerMock
.SetupGet(x => x.Configuration)
.Returns(configurationMock.Object);

_consumerMock
.SetupGet(x => x.Assignment)
.Returns(Array.Empty<TopicPartition>());

_target = new ConsumerManager(
_consumerMock.Object,
_workerPoolMock.Object,
Expand Down Expand Up @@ -134,19 +134,19 @@ public void OnPartitionsAssigned_StartWorkerPool()
var currentPartitions = _fixture.Create<List<Confluent.Kafka.TopicPartition>>();
var newAssignedPartitions = _fixture.Create<List<Confluent.Kafka.TopicPartition>>();
var allPartitions = currentPartitions.Concat(newAssignedPartitions).ToArray();

_workerPoolMock
.Setup(x => x.StopAsync())
.Returns(Task.CompletedTask);

_workerPoolMock
.Setup(x => x.StartAsync(allPartitions, It.IsAny<int>()))
.Returns(Task.CompletedTask);

_logHandlerMock
.Setup(x => x.Info(It.IsAny<string>(), It.IsAny<object>()));
_consumerMock.SetupGet(x=>x.Assignment).Returns(allPartitions.ToArray());

_consumerMock.SetupGet(x => x.Assignment).Returns(allPartitions.ToArray());

// Act
_onPartitionAssignedHandler(_dependencyResolver.Object, Mock.Of<Confluent.Kafka.IConsumer<byte[], byte[]>>(), newAssignedPartitions);
Expand All @@ -164,13 +164,11 @@ public void OnPartitionsRevoked_StopWorkerPool()
var currentPartitions = _fixture.CreateMany<Confluent.Kafka.TopicPartition>(6).ToList();
var revokedPartitions = currentPartitions.Take(3).ToArray();
var leftPartitions = currentPartitions.Except(revokedPartitions).ToArray();

var partitions = _fixture.Create<List<Confluent.Kafka.TopicPartitionOffset>>();

_workerPoolMock
.Setup(x => x.StopAsync())
.Returns(Task.CompletedTask);

_workerPoolMock
.Setup(x => x.StartAsync(leftPartitions, It.IsAny<int>()))
.Returns(Task.CompletedTask);
Expand All @@ -181,11 +179,11 @@ public void OnPartitionsRevoked_StopWorkerPool()

_logHandlerMock
.Setup(x => x.Warning(It.IsAny<string>(), It.IsAny<object>()));
_consumerMock.SetupGet(x=>x.Assignment).Returns(leftPartitions);

_consumerMock.SetupGet(x => x.Assignment).Returns(leftPartitions);

// Act
_onPartitionRevokedHandler(_dependencyResolver.Object, Mock.Of<Confluent.Kafka.IConsumer<byte[], byte[]>>(), revokedPartitions.Select(x=>new Confluent.Kafka.TopicPartitionOffset(x,123)).ToList());
_onPartitionRevokedHandler(_dependencyResolver.Object, Mock.Of<Confluent.Kafka.IConsumer<byte[], byte[]>>(), revokedPartitions.Select(x => new Confluent.Kafka.TopicPartitionOffset(x, 123)).ToList());

// Assert
_workerPoolMock.VerifyAll();
Expand Down

0 comments on commit 0683df5

Please sign in to comment.