diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2729154bab4ac..f9486bb3dbd97 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -962,8 +962,11 @@ class Log(@volatile private var _dir: File, producerStateManager.takeSnapshot() } } else { + info(s"Reloading from producer snapshot and rebuilding producer state from offset $lastOffset") val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset + val producerStateLoadStart = time.milliseconds() producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) + val segmentRecoveryStart = time.milliseconds() // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end // offset (which would be the case on first startup) and there were active producers prior to truncation @@ -999,6 +1002,8 @@ class Log(@volatile private var _dir: File, } producerStateManager.updateMapEndOffset(lastOffset) producerStateManager.takeSnapshot() + info(s"Producer state recovery took ${producerStateLoadStart - segmentRecoveryStart}ms for snapshot load " + + s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset") } }