Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka 3.5.1 and confluent 3.4.1 #1554

Merged
merged 1 commit into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
CONFLUENT_VERSION=7.3.3
CONFLUENT_VERSION=7.4.1
CONFLUENT_KAFKACAT_VERSION=7.1.7 # seems there is not a release with each version
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ dependencies {
implementation 'org.sourcelab:kafka-connect-client:4.0.3'

// strimzi
implementation group: 'io.strimzi', name: 'kafka-oauth-common', version: '0.12.0'
implementation group: 'io.strimzi', name: 'kafka-oauth-client', version: '0.12.0'
implementation group: 'io.strimzi', name: 'kafka-oauth-common', version: '0.13.0'
implementation group: 'io.strimzi', name: 'kafka-oauth-client', version: '0.13.0'

// log
implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.0-alpha5'
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
micronautVersion=3.9.4
confluentVersion=7.3.3
kafkaVersion=3.4.0
confluentVersion=7.4.1
kafkaVersion=3.5.1
kafkaScalaVersion=2.13
lombokVersion=1.18.28
6 changes: 3 additions & 3 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import kafka.coordinator.group.GroupMetadataManager;
import kafka.coordinator.transaction.BaseKey;
import kafka.coordinator.transaction.TransactionLog;
import kafka.coordinator.transaction.TxnKey;
import lombok.*;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.utils.AvroToJsonDeserializer;
Expand Down Expand Up @@ -231,12 +231,12 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
} else if (topic.isInternalTopic() && topic.getName().equals("__transaction_state")) {
try {
if (isKey) {
TxnKey txnKey = TransactionLog.readTxnRecordKey(ByteBuffer.wrap(payload));
BaseKey txnKey = TransactionLog.readTxnRecordKey(ByteBuffer.wrap(payload));
return avroToJsonSerializer.getMapper().writeValueAsString(
Map.of("transactionalId", txnKey.transactionalId(), "version", txnKey.version())
);
} else {
TxnKey txnKey = TransactionLog.readTxnRecordKey(ByteBuffer.wrap(this.bytesKey));
BaseKey txnKey = TransactionLog.readTxnRecordKey(ByteBuffer.wrap(this.bytesKey));
return avroToJsonSerializer.getMapper().writeValueAsString(TransactionLog.readTxnRecordValue(txnKey.transactionalId(), ByteBuffer.wrap(payload)));
}
} catch (Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
Expand Down Expand Up @@ -179,14 +180,14 @@ public Schema register(String clusterId, String subject, String schema, List<Sch
}

public Schema register(String clusterId, String subject, String type, String schema, List<SchemaReference> references) throws IOException, RestClientException {
int id = this.kafkaModule
RegisterSchemaResponse registerSchemaResponse = this.kafkaModule
.getRegistryRestClient(clusterId)
.registerSchema(schema, type != null? type: "AVRO", references, subject);

Schema latestVersion = getLatestVersion(clusterId, subject);

if (latestVersion.getId() != id) {
throw new IllegalArgumentException("Invalid id from registry expect " + id + " got last version " + latestVersion.getId());
if (latestVersion.getId() != registerSchemaResponse.getId()) {
throw new IllegalArgumentException("Invalid id from registry expect " + registerSchemaResponse.getId() + " got last version " + latestVersion.getId());
}

return latestVersion;
Expand Down
146 changes: 146 additions & 0 deletions src/test/java/kafka/utils/ShutdownableThread.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.utils;

import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public abstract class ShutdownableThread extends Thread {

public final String logPrefix;

private final Logger log;

private final boolean isInterruptible;

private final CountDownLatch shutdownInitiated = new CountDownLatch(1);
private final CountDownLatch shutdownComplete = new CountDownLatch(1);

private volatile boolean isStarted = false;

public ShutdownableThread(String name) {
this(name, true);
}

public ShutdownableThread(String name, boolean isInterruptible) {
this(name, isInterruptible, "[" + name + "]: ");
}

public ShutdownableThread(String name, boolean isInterruptible, String logPrefix) {
super(name);
this.isInterruptible = isInterruptible;
this.logPrefix = logPrefix;
log = new LogContext(logPrefix).logger(this.getClass());
this.setDaemon(false);
}

public void shutdown() throws InterruptedException {
initiateShutdown();
awaitShutdown();
}

public boolean isShutdownInitiated() {
return shutdownInitiated.getCount() == 0;
}

public boolean isShutdownComplete() {
return shutdownComplete.getCount() == 0;
}

/**
* @return true if there has been an unexpected error and the thread shut down
*/
// mind that run() might set both when we're shutting down the broker
// but the return value of this function at that point wouldn't matter
public boolean isThreadFailed() {
return isShutdownComplete() && !isShutdownInitiated();
}

public boolean initiateShutdown() {
synchronized (this) {
if (isRunning()) {
log.info("Shutting down");
shutdownInitiated.countDown();
if (isInterruptible)
interrupt();
return true;
} else
return false;
}
}

/**
* After calling initiateShutdown(), use this API to wait until the shutdown is complete.
*/
public void awaitShutdown() throws InterruptedException {
if (!isShutdownInitiated())
throw new IllegalStateException("initiateShutdown() was not called before awaitShutdown()");
else {
if (isStarted)
shutdownComplete.await();
log.info("Shutdown completed");
}
}

/**
* Causes the current thread to wait until the shutdown is initiated,
* or the specified waiting time elapses.
*
* @param timeout wait time in units.
* @param unit TimeUnit value for the wait time.
*/
public void pause(long timeout, TimeUnit unit) throws InterruptedException {
if (shutdownInitiated.await(timeout, unit))
log.trace("shutdownInitiated latch count reached zero. Shutdown called.");
}

/**
* This method is repeatedly invoked until the thread shuts down or this method throws an exception
*/
public abstract void doWork();

public void run() {
isStarted = true;
log.info("Starting");
try {
while (isRunning())
doWork();
} catch (FatalExitError e) {
shutdownInitiated.countDown();
shutdownComplete.countDown();
log.info("Stopped");
Exit.exit(e.statusCode());
} catch (Throwable e) {
if (isRunning())
log.error("Error due to", e);
} finally {
shutdownComplete.countDown();
}
log.info("Stopped");
}

public boolean isRunning() {
return !isShutdownInitiated();
}

}
7 changes: 4 additions & 3 deletions src/test/java/org/akhq/clusters/ConnectEmbedded.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.*;
import org.apache.kafka.connect.util.ConnectUtils;

import java.net.URI;
import java.util.Map;
Expand All @@ -31,8 +31,8 @@ public ConnectEmbedded(final Properties properties) {
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);


RestServer rest = new RestServer(config);
RestClient restClient = new RestClient(config);
RestServer rest = new RestServer(config, restClient);
rest.initializeServer();

URI advertisedUrl = rest.advertisedUrl();
Expand Down Expand Up @@ -61,6 +61,7 @@ public ConnectEmbedded(final Properties properties) {
statusBackingStore,
configBackingStore,
advertisedUrl.toString(),
restClient,
new NoneConnectorClientConfigOverridePolicy()
);

Expand Down
39 changes: 39 additions & 0 deletions src/test/java/org/akhq/clusters/ConnectUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.akhq.clusters;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;

import java.util.concurrent.ExecutionException;

@Slf4j
public class ConnectUtils {
public static String lookupKafkaClusterId(WorkerConfig config) {
log.info("Creating Kafka admin client");
try (Admin adminClient = Admin.create(config.originals())) {
return lookupKafkaClusterId(adminClient);
}
}

static String lookupKafkaClusterId(Admin adminClient) {
log.debug("Looking up Kafka cluster ID");
try {
KafkaFuture<String> clusterIdFuture = adminClient.describeCluster().clusterId();
if (clusterIdFuture == null) {
log.info("Kafka cluster version is too old to return cluster ID");
return null;
}
log.debug("Fetching Kafka cluster ID");
String kafkaClusterId = clusterIdFuture.get();
log.info("Kafka cluster ID: {}", kafkaClusterId);
return kafkaClusterId;
} catch (InterruptedException e) {
throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e);
} catch (ExecutionException e) {
throw new ConnectException("Failed to connect to and describe Kafka cluster. "
+ "Check worker's broker connection and security properties.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class KsqlDbControllerTest extends AbstractTest {
void info() {
KsqlDbServerInfo serverInfo = this.retrieve(HttpRequest.GET(BASE_URL + "/info"), KsqlDbServerInfo.class);
assertNotNull(serverInfo.getKafkaClusterId());
assertEquals("7.3.3", serverInfo.getServerVersion());
assertEquals("7.4.1", serverInfo.getServerVersion());
assertEquals("ksql", serverInfo.getKsqlServiceId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void cleanup() {
void getServerInfo() {
KsqlDbServerInfo serverInfo = repository.getServerInfo(KafkaTestCluster.CLUSTER_ID, "ksqldb");
assertNotNull(serverInfo.getKafkaClusterId());
assertEquals("7.3.3", serverInfo.getServerVersion());
assertEquals("7.4.1", serverInfo.getServerVersion());
assertEquals("ksql", serverInfo.getKsqlServiceId());
}

Expand Down