Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate topic declaration before trying to create them #383

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/KafkaFlow/Clusters/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable<TopicConfiguration> configu
try
{
var topics = configurations
.Distinct(new DuplicateTopicConfigurationEqualityComparer())
.Select(
topicConfiguration => new TopicSpecification
{
Expand Down Expand Up @@ -75,5 +76,12 @@ public async Task CreateIfNotExistsAsync(IEnumerable<TopicConfiguration> configu
}
}
}

private class DuplicateTopicConfigurationEqualityComparer : IEqualityComparer<TopicConfiguration>
{
public bool Equals(TopicConfiguration x, TopicConfiguration y) => x?.Name == y?.Name;

public int GetHashCode(TopicConfiguration obj) => obj.Name.GetHashCode();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe throwing an exception should be better. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's whats happening now. The issue with the exception is that it means that we must only call createtopics one time per topic.
In our case for a topic we have many handler and by default we call create topic. The registration happen in the bounded context of the handler and is then use in the app. Each bounded context are not aware of how the registration has been made for other bounded context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since KafkaFlow doesn't throw an exception if it already exists, I think it makes sense not to throw it in case of duplicated configuration. I believe that as a consumer, I would expect the first attempt to succeed and the second to be ignored.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kYann
We discussed this issue and @filipeesch raised a scenario that we are concerned with.
If eventually, we have a duplicate topic declaration with different parameters (number of partitions and replicas), a silent deduplication process may induce confusion.
So, we believe that at least we should log a warning mentioning the duplication and that we are deduplicating and using the settings X and Y to try to create the topic.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think the warning is a good idea. There is no other way to resolve this.
If the warning could happen only when the parameters are different would be a nice touch.

}
}