Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Error when number of partition changed using Cooperative rebalance protocol #3056

Open
jsyauideagen opened this issue Apr 16, 2024 · 6 comments
Assignees
Labels

Comments

@jsyauideagen
Copy link

Describe the bug

After we increase the number of partition of a topic (e.g. from 2 -> 5), the consumer will hit into rebalance related error.

To Reproduce

Increase any existing topic's number of partition and you will hit into this error.

Exceptions (if any)

%4|1713152471.109|ASSIGN|icsa.notification#consumer-6| [thrd:main]: Group "ICSA.Notification.Services.NotificationProcessor": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE
Unhandled exception. Confluent.Kafka.KafkaException: Local: Erroneous state
at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable1 partitions, Func3 assignMethodErr, Func3 assignMethodError) at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable1 partitions)
at Confluent.Kafka.Consumer2.Unassign() at Confluent.Kafka.Consumer2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)

Further technical details

  • Brighter version - 9.7.6
  • .NET SDK:
    Version: 8.0.202
    Commit: 25674bb2f4
    Workload version: 8.0.200-manifests.8cf8de6d

Runtime Environment:
OS Name: Windows
OS Version: 10.0.22621
OS Platform: Windows
RID: win-x64
Base Path: C:\Program Files\dotnet\sdk\8.0.202\

.NET workloads installed:
There are no installed workloads to display.

Host:
Version: 8.0.3
Architecture: x64
Commit: 9f4b1f5d66

.NET SDKs installed:
2.1.818 [C:\Program Files\dotnet\sdk]
3.1.426 [C:\Program Files\dotnet\sdk]
5.0.408 [C:\Program Files\dotnet\sdk]
6.0.203 [C:\Program Files\dotnet\sdk]
8.0.103 [C:\Program Files\dotnet\sdk]
8.0.202 [C:\Program Files\dotnet\sdk]

.NET runtimes installed:
Microsoft.AspNetCore.All 2.1.30 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.All]
Microsoft.AspNetCore.App 2.1.30 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App]
Microsoft.AspNetCore.App 3.1.32 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App]
Microsoft.AspNetCore.App 5.0.17 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App]
Microsoft.AspNetCore.App 6.0.5 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App]
Microsoft.AspNetCore.App 6.0.28 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App]
Microsoft.AspNetCore.App 7.0.17 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App]
Microsoft.AspNetCore.App 7.0.18 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App]
Microsoft.AspNetCore.App 8.0.3 [C:\Program Files\dotnet\shared\Microsoft.AspNetCore.App]
Microsoft.NETCore.App 2.0.9 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Microsoft.NETCore.App 2.1.30 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Microsoft.NETCore.App 3.1.32 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Microsoft.NETCore.App 5.0.17 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Microsoft.NETCore.App 6.0.28 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Microsoft.NETCore.App 6.0.29 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Microsoft.NETCore.App 7.0.17 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Microsoft.NETCore.App 7.0.18 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Microsoft.NETCore.App 8.0.3 [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Microsoft.WindowsDesktop.App 3.1.32 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]
Microsoft.WindowsDesktop.App 5.0.17 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]
Microsoft.WindowsDesktop.App 6.0.5 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]
Microsoft.WindowsDesktop.App 6.0.15 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]
Microsoft.WindowsDesktop.App 6.0.28 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]
Microsoft.WindowsDesktop.App 6.0.29 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]
Microsoft.WindowsDesktop.App 7.0.17 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]
Microsoft.WindowsDesktop.App 7.0.18 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]
Microsoft.WindowsDesktop.App 8.0.3 [C:\Program Files\dotnet\shared\Microsoft.WindowsDesktop.App]

Other architectures found:
x86 [C:\Program Files (x86)\dotnet]
registered at [HKLM\SOFTWARE\dotnet\Setup\InstalledVersions\x86\InstallLocation]

Environment variables:
Not set

global.json file:
Not found

  • The OS - Window
  • Potential fix in Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs
     _consumer = new ConsumerBuilder<string, byte[]>(_consumerConfig) 
      .SetPartitionsAssignedHandler((consumer, partitions) =>
     {
         // Log information about newly assigned partitions
         var partitionInfo = partitions.Select(p => $"{p.Topic} : {p.Partition.Value}");
         s_logger.LogInformation("Partition Assigned {Channels}", String.Join(",", partitionInfo));
    
         // Determine strategy and act accordingly
         if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
         {
             consumer.IncrementalAssign(partitions);
         }
         else
         {
             consumer.Assign(partitions);
         }
     })
     .SetPartitionsRevokedHandler((consumer, partitions) =>
     {
         // Log information about revoked partitions
         var revokedPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}");
         s_logger.LogInformation("Partitions Revoked {Channels}", string.Join(", ", revokedPartitions));
    
         // Determine strategy and act accordingly
         if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
         {
             consumer.IncrementalUnassign(partitions);
         }
         else
         {
             consumer.Unassign();
         }
     })
     .SetPartitionsLostHandler((consumer, partitions) =>
     {
         // Log information about lost partitions
         var lostPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}");
         s_logger.LogInformation("Partitions Lost {Channels}", string.Join(", ", lostPartitions));
    
         // This is typically treated the same as revocation
         consumer.IncrementalUnassign(partitions);
     })
     .SetErrorHandler((consumer, error) =>
     {
         s_logger.LogError("Error in Kafka Consumer: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}", 
             error.Code, error.Reason, error.IsFatal);
     })
     .Build();
    
@iancooper iancooper self-assigned this Apr 16, 2024
@iancooper iancooper changed the title Error when number of partition changed using Cooperative rebalance protocol [BuG] Error when number of partition changed using Cooperative rebalance protocol Apr 16, 2024
iancooper added a commit that referenced this issue Apr 16, 2024
@iancooper
Copy link
Member

iancooper commented Apr 16, 2024

So we can't call assign/unassign from within the partition revoked handler, see https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/ConsumerBuilder.cs l.292-398 So I don't think the above code is right.

Cooperative Sticky appears to be problematic for manual commits: confluentinc/confluent-kafka-dotnet#2206 and there is an issue with when partition revoked is called with a co-operative re-balance, because it could be mid re-balance whilst doing commits which potentially triggers another re-balance, leading to an endless balance.

So this needs some thought. I am going to temporarily flag an error on attempts to set co-operative re-balance, as it does not work with manual commits. I'll need to review how Java handles this I think: https://github.com/apache/kafka/blob/fa10e213bfea306bb5d4590dbdc5e54854192782/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1304

@iancooper
Copy link
Member

After disabling Cooperative Sticky we see this: "Error Committing Offsets During Partition Revoke: "Local: No offset stored" Code: Local_NoOffset, Reason: "Local: No offset stored", Fatal: False:

As Local_NoOffset is a soft error is may just be that there are no new offsets to store. But it is noisy and we should kill it if we don't need to see it: confluentinc/confluent-kafka-python#71 (comment)

@MaorDavidzon
Copy link

@iancooper
Is supporting comparative sticky with manual commit on the roadmap? This is a common use case, and it’s already been implemented for other clients.

@iancooper
Copy link
Member

@MaorDavidzon Still working on it. Because Librdkafka does not support it, we will need to write code equivalent to the Java Client to support it. So it's a bit of a longer fix.

@iancooper
Copy link
Member

After disabling Cooperative Sticky we see this: "Error Committing Offsets During Partition Revoke: "Local: No offset stored" Code: Local_NoOffset, Reason: "Local: No offset stored", Fatal: False:

As Local_NoOffset is a soft error is may just be that there are no new offsets to store. But it is noisy and we should kill it if we don't need to see it: confluentinc/confluent-kafka-python#71 (comment)

This was fixed in #3059. Still open for co-operative sticky fixes

@iancooper
Copy link
Member

iancooper commented Apr 22, 2024

@iancooper iancooper changed the title [BuG] Error when number of partition changed using Cooperative rebalance protocol [BUG] Error when number of partition changed using Cooperative rebalance protocol May 6, 2024
@iancooper iancooper changed the title [BUG] Error when number of partition changed using Cooperative rebalance protocol [Bug] Error when number of partition changed using Cooperative rebalance protocol May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants