Skip to content

Commit

Permalink
MINOR: fix kafka-metadata-shell.sh (#10226)
Browse files Browse the repository at this point in the history
* Fix CLASSPATH issues in the startup script

* Fix overly verbose log messages during loading

* Update to use the new MetadataRecordSerde (this is needed now that we
  have a frame version)

* Fix initialization

Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
cmccabe committed Feb 27, 2021
1 parent a61f309 commit 3f76293
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 15 deletions.
12 changes: 12 additions & 0 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ do
CLASSPATH="$CLASSPATH":"$file"
done

for file in "$base_dir"/shell/build/libs/kafka-shell*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done

for dir in "$base_dir"/shell/build/dependant-libs-${SCALA_VERSION}*;
do
CLASSPATH="$CLASSPATH:$dir/*"
done

for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
do
if should_include_file "$file"; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
@Override
public void handleCommits(long lastOffset, List<ApiMessage> messages) {
appendEvent("handleCommits", () -> {
log.error("handleCommits " + messages + " at offset " + lastOffset);
log.debug("handleCommits " + messages + " at offset " + lastOffset);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
dir.create("offset").setContents(String.valueOf(lastOffset));
for (ApiMessage message : messages) {
Expand All @@ -108,7 +108,7 @@ public void handleCommits(long lastOffset, List<ApiMessage> messages) {
@Override
public void handleNewLeader(MetaLogLeader leader) {
appendEvent("handleNewLeader", () -> {
log.error("handleNewLeader " + leader);
log.debug("handleNewLeader " + leader);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
dir.create("leader").setContents(leader.toString());
}, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public Builder setSnapshotPath(String snapshotPath) {
}

public MetadataShell build() throws Exception {
if (snapshotPath == null) {
throw new RuntimeException("You must supply the log path via --snapshot");
}
MetadataNodeManager nodeManager = null;
SnapshotFileReader reader = null;
try {
Expand Down Expand Up @@ -99,11 +102,15 @@ public void run(List<String> args) throws Exception {
}
if (args == null || args.isEmpty()) {
// Interactive mode.
System.out.println("Loading...");
waitUntilCaughtUp();
System.out.println("Starting...");
try (InteractiveShell shell = new InteractiveShell(nodeManager)) {
shell.runMainLoop();
}
} else {
// Non-interactive mode.
waitUntilCaughtUp();
Commands commands = new Commands(false);
try (PrintWriter writer = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(System.out, StandardCharsets.UTF_8)))) {
Expand Down Expand Up @@ -150,7 +157,6 @@ public static void main(String[] args) throws Exception {
}
});
MetadataShell shell = builder.build();
shell.waitUntilCaughtUp();
try {
shell.run(res.getList("command"));
} finally {
Expand Down
17 changes: 5 additions & 12 deletions shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.kafka.shell;

import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.ControlRecordType;
Expand All @@ -27,8 +26,10 @@
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.raft.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.slf4j.Logger;
Expand All @@ -53,6 +54,7 @@ public final class SnapshotFileReader implements AutoCloseable {
private final CompletableFuture<Void> caughtUpFuture;
private FileRecords fileRecords;
private Iterator<FileChannelRecordBatch> batchIterator;
private final MetadataRecordSerde serde = new MetadataRecordSerde();

public SnapshotFileReader(String snapshotPath, MetaLogListener listener) {
this.snapshotPath = snapshotPath;
Expand Down Expand Up @@ -140,17 +142,8 @@ private void handleMetadataBatch(FileChannelRecordBatch batch) {
Record record = iter.next();
ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
try {
int apiKey = accessor.readUnsignedVarint();
if (apiKey > Short.MAX_VALUE || apiKey < 0) {
throw new RuntimeException("Invalid apiKey value " + apiKey);
}
int apiVersion = accessor.readUnsignedVarint();
if (apiVersion > Short.MAX_VALUE || apiVersion < 0) {
throw new RuntimeException("Invalid apiVersion value " + apiVersion);
}
ApiMessage message = MetadataRecordType.fromId((short) apiKey).newMetadataRecord();
message.read(accessor, (short) apiVersion);
messages.add(message);
ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize());
messages.add(messageAndVersion.message());
} catch (Throwable e) {
log.error("unable to read metadata record at offset {}", record.offset(), e);
}
Expand Down

0 comments on commit 3f76293

Please sign in to comment.