From 79d84040198f8e3e05892f12093f19ccca9b3b3e Mon Sep 17 00:00:00 2001 From: Dmitry Potepalov Date: Tue, 5 Nov 2024 17:44:08 +0100 Subject: [PATCH] Reset repairedAt when receiving sstables We're not using incremental repairs, therefore we want Repaired At to be set to 0 for all sstables. Some older services still have this value set to non-zero values from the times when we were using incremental repairs. This patch should make sure that any new node we bootstrap has Repaired At set to 0 for all sstables. This patch should be removed when we release the next minor version after 4.1.7. --- .../db/streaming/CassandraCompressedStreamReader.java | 5 ++++- .../db/streaming/CassandraEntireSSTableStreamReader.java | 7 +++++-- .../cassandra/db/streaming/CassandraStreamReader.java | 5 ++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java index f8eac9379f..59004ef75c 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java @@ -75,7 +75,10 @@ public SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance)) { TrackedDataInputPlus in = new TrackedDataInputPlus(cis); - writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); + if (repairedAt > 0) { + logger.info("[Stream #{}] Rewriting repairedAt from {} to 0 for sstable #{}", session.planId(), repairedAt, fileSeqNum); + } + writer = createWriter(cfs, totalSize, 0, pendingRepair, format); deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()), session, writer); String filename = writer.getFilename(); String sectionName = filename + '-' + fileSeqNum; diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java index 515c85dea6..39d1b2f1fa 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java @@ -134,10 +134,13 @@ public SSTableMultiWriter read(DataInputPlus in) throws Throwable prettyPrintMemory(totalSize)); } + if (messageHeader.repairedAt > 0) { + logger.info("[Stream #{}] Rewriting repairedAt from {} to 0 for sstable #{}", session.planId(), messageHeader.repairedAt, fileSequenceNumber); + } UnaryOperator transform = stats -> stats.mutateLevel(header.sstableLevel) - .mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair, false); + .mutateRepairedMetadata(0, messageHeader.pendingRepair, false); String description = String.format("level %s and repairedAt time %s and pendingRepair %s", - header.sstableLevel, messageHeader.repairedAt, messageHeader.pendingRepair); + header.sstableLevel, 0, messageHeader.pendingRepair); writer.descriptor.getMetadataSerializer().mutate(writer.descriptor, description, transform); return writer; } diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index 7f7b97c930..53109735aa 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@ -135,7 +135,10 @@ public SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable try (StreamCompressionInputStream streamCompressionInputStream = new StreamCompressionInputStream(inputPlus, current_version)) { TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream); - writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); + if (repairedAt > 0) { + logger.info("[Stream #{}] Rewriting repairedAt from {} to 0 for sstable #{}", session.planId(), repairedAt, fileSeqNum); + } + writer = createWriter(cfs, totalSize, 0, pendingRepair, format); deserializer = getDeserializer(cfs.metadata(), in, inputVersion, session, writer); String sequenceName = writer.getFilename() + '-' + fileSeqNum; long lastBytesRead = 0;