Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DescribeCluster() returns random broker ID as the controller ID under KRaft mode #2521

Open
panyuenlau opened this issue Jul 25, 2023 · 8 comments
Labels
stale/exempt Issues and pull requests that should never be closed as stale

Comments

@panyuenlau
Copy link

panyuenlau commented Jul 25, 2023

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
1.40.0 3.4.1 1.19

Main logic:

func (r *Reconciler) determineControllerId(log logr.Logger) (int32, error) {
	kClient, close, err := r.kafkaClientProvider.NewFromCluster(r.Client, r.KafkaCluster)
	if err != nil {
		return -1, errors.WrapIf(err, "could not create Kafka client, thus could not determine controller")
	}
	defer close()

	brokers, controllerID, err := kClient.DescribeCluster()
	if err != nil {
		return -1, errors.WrapIf(err, "could not find controller broker")
	}

	log.Info("TESTING", "brokers", brokers, "controllerID", controllerID)

	return controllerID, nil
}

How kafka admin client is set up:

type Provider interface {
	NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error)
}

func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
	return NewFromCluster(client, cluster)
}

// NewFromCluster is a convenience wrapper around New() and ClusterConfig()
func NewFromCluster(k8sclient client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
	var client KafkaClient
	var err error
	opts, err := ClusterConfig(k8sclient, cluster)
	if err != nil {
		return nil, nil, err
	}
	client = New(opts)
	err = client.Open()
	close := func() {
		if err := client.Close(); err != nil {
			log.Error(err, "Error closing Kafka client")
		} else {
			log.Info("Kafka client closed cleanly")
		}
	}
	return client, close, err
}

There is no error while setting up the admin client, but what I obtained from kClient.DescribeCluster() is just a random broker ID from my Kafka cluster with 3 quorum voters and 3 regular brokers and the returned brokers list is just a list of empty structs, example logs from my program above:

{"level":"info","ts":"2023-07-25T02:38:42.144Z","msg":"DEBUGGING","controller":"KafkaCluster","controllerGroup":"kafka.banzaicloud.io","controllerKind":"KafkaCluster","KafkaCluster":{"name":"kafka","namespace":"default"},"namespace":"default","name":"kafka","reconcileID":"05fba694-8699-4483-ac81-eb6b27b8dc76","component":"kafka","clusterName":"kafka","clusterNamespace":"default","brokers":[{},{},{},{}],"controllerID":2}
{"level":"info","ts":"2023-07-25T02:38:11.556Z","msg":"DEBUGGING","controller":"KafkaCluster","controllerGroup":"kafka.banzaicloud.io","controllerKind":"KafkaCluster","KafkaCluster":{"name":"kafka","namespace":"default"},"namespace":"default","name":"kafka","reconcileID":"6d89d18c-f341-444b-a062-9210dee5888d","component":"kafka","clusterName":"kafka","clusterNamespace":"default","brokers":[{},{},{},{}],"controllerID":0}

Actual state of my Kafka cluster:

./opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server kafka-headless:29092 describe --status
ClusterId:              x_NfbXESRDqP6kP4cwv-xQ
LeaderId:               5
LeaderEpoch:            20
HighWatermark:          88266
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   164
CurrentVoters:          [3,4,5]
CurrentObservers:       [0,1,2]
@panyuenlau
Copy link
Author

Note that everything is working fine in ZooKeeper mode, and it looks like sarama is just not compatible with KRaft yet.

It'd be appreciated if the community can provide some insights

@dnwe
Copy link
Collaborator

dnwe commented Jul 25, 2023

@panyuenlau this is actually an intentional behaviour in the server-side kraft code. See https://github.com/apache/kafka/blob/b2dea17041157ceee741041d23783ff993b88ef1/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L290-L305

When using KRaft, a cluster no longer has a single controller. Instead, nodes in the cluster that are running with the "controller" role, all take part in the controller metadata quorum.

Kafka just expects you to send your requests to any node in the cluster and it will be load balanced accordingly

@panyuenlau
Copy link
Author

Hey @dnwe - thanks for the info. I understand that Kafka itself doesn't want clients to have direct access to the quorum controllers, however, shouldn't an admin have the right to know about the quorum controllers? For example, can sarama leverage this https://github.com/apache/kafka/blob/3.4.1/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java#L1482-L1509?

@dnwe
Copy link
Collaborator

dnwe commented Jul 25, 2023

@panyuenlau sure, in the future we will of course add the DescribeQuorumRequest and Response protocol and we can expose this for the metadata topic in the admin.go code in the same way as it is for Java, but afaik this is just informative rather than actually being used by the client to make any choices

@panyuenlau
Copy link
Author

panyuenlau commented Jul 25, 2023

Alrighty, thank you for the response! @dnwe

Shall I keep the issue open? If not, please feel free to close it

@panyuenlau
Copy link
Author

panyuenlau commented Jul 25, 2023

Actually - I just noticed that there is a KIP currently under discussion and it ties to what we are discussing here: KIP-919

@dnwe
Copy link
Collaborator

dnwe commented Jul 26, 2023

Good find. Yes let’s keep this issue open for now. For one, I’d like to add an FVT matrix that uses Kraft mode anyway and two, we should at least document in the admin client that controllerID is random in Kraft mode.

Adding the metadata quorum protocol will happen over time as part of our general bringup of new protocol which is work in progress in the background

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Oct 24, 2023
@dnwe dnwe added stale/exempt Issues and pull requests that should never be closed as stale and removed stale Issues and pull requests without any recent activity labels Nov 2, 2023
@cuiwin
Copy link

cuiwin commented Feb 22, 2024

image
use sarama v1.42.2 The same confusion I encountered, continue to follow

@IBM IBM deleted a comment from github-actions bot Feb 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale/exempt Issues and pull requests that should never be closed as stale
Projects
None yet
Development

No branches or pull requests

3 participants