From 30a68a5689ac478e350a9e853fd6bddfd2135c69 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Fri, 2 Sep 2022 14:32:06 +0200 Subject: [PATCH] Add gzip compression --- .../kafka/backup/s3/BackupClient.scala | 16 +- .../MockedKafkaClientBackupClientSpec.scala | 2 +- .../s3/MockedS3BackupClientInterface.scala | 6 +- .../backup/s3/RealS3BackupClientSpec.scala | 749 +----------------- .../backup/s3/RealS3BackupClientTest.scala | 736 +++++++++++++++++ ...ealS3GzipCompressionBackupClientSpec.scala | 12 + build.sbt | 4 +- .../io/aiven/guardian/kafka/backup/Main.scala | 21 +- .../aiven/guardian/kafka/backup/CliSpec.scala | 10 +- .../kafka/backup/BackupClientInterface.scala | 168 +++- .../kafka/backup/configs/Backup.scala | 5 +- .../kafka/backup/configs/Compression.scala | 5 + .../backup/BackupClientInterfaceSpec.scala | 352 +------- .../backup/BackupClientInterfaceTest.scala | 365 +++++++++ .../kafka/backup/CompressionSpec.scala | 46 ++ .../ConfigurationChangeRestartSpec.scala | 181 +++++ ...CompressionBackupClientInterfaceSpec.scala | 12 + .../backup/MockedBackupClientInterface.scala | 59 +- .../restore/RestoreClientInterface.scala | 12 +- ...ompressionRestoreClientInterfaceSpec.scala | 12 + .../restore/RestoreClientInterfaceSpec.scala | 140 +--- .../restore/RestoreClientInterfaceTest.scala | 150 ++++ .../scala/io/aiven/guardian/kafka/Utils.scala | 18 +- .../kafka/models/BackupObjectMetadata.scala | 11 + .../kafka/models/CompressionType.scala | 9 + docs/src/main/paradox/backup/configuration.md | 6 + ...alS3GzipCompressionRestoreClientSpec.scala | 12 + .../restore/s3/RealS3RestoreClientSpec.scala | 157 +--- .../restore/s3/RealS3RestoreClientTest.scala | 162 ++++ 29 files changed, 1988 insertions(+), 1450 deletions(-) create mode 100644 backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientTest.scala create mode 100644 backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3GzipCompressionBackupClientSpec.scala create mode 100644 core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Compression.scala create mode 100644 core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceTest.scala create mode 100644 core-backup/src/test/scala/io/aiven/guardian/kafka/backup/CompressionSpec.scala create mode 100644 core-backup/src/test/scala/io/aiven/guardian/kafka/backup/ConfigurationChangeRestartSpec.scala create mode 100644 core-backup/src/test/scala/io/aiven/guardian/kafka/backup/GzipCompressionBackupClientInterfaceSpec.scala create mode 100644 core-restore/src/test/scala/io/aiven/guardian/kafka/restore/GzipCompressionRestoreClientInterfaceSpec.scala create mode 100644 core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceTest.scala create mode 100644 core/src/main/scala/io/aiven/guardian/kafka/models/BackupObjectMetadata.scala create mode 100644 core/src/main/scala/io/aiven/guardian/kafka/models/CompressionType.scala create mode 100644 restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3GzipCompressionRestoreClientSpec.scala create mode 100644 restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientTest.scala diff --git a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala index a3b24c36..354da966 100644 --- a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala +++ b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala @@ -12,6 +12,7 @@ import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.backup.BackupClientInterface import io.aiven.guardian.kafka.backup.KafkaClientInterface import io.aiven.guardian.kafka.backup.configs.Backup +import io.aiven.guardian.kafka.models.BackupObjectMetadata import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} import scala.collection.immutable @@ -167,8 +168,11 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings else Future.successful(None) - } yield UploadStateResult(current.map(_._1), - previous.map { case (state, previousKey) => PreviousState(state, previousKey) } + } yield UploadStateResult( + current.map { case (state, key) => StateDetails(state, BackupObjectMetadata.fromKey(key)) }, + previous.map { case (previousState, previousKey) => + PreviousState(StateDetails(previousState, BackupObjectMetadata.fromKey(previousKey)), previousKey) + } ) } @@ -238,19 +242,19 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings // the same key that means that in fact the upload has already been completed so in this case lets not do anything if (exists) { logger.debug( - s"Previous upload with uploadId: ${previousState.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating" + s"Previous upload with uploadId: ${previousState.stateDetails.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating" ) Sink.ignore } else { logger.info( - s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.state.uploadId}" + s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.stateDetails.state.uploadId}" ) val sink = S3 .resumeMultipartUploadWithHeaders( s3Config.dataBucket, previousState.previousKey, - previousState.state.uploadId, - previousState.state.parts, + previousState.stateDetails.state.uploadId, + previousState.stateDetails.state.parts, s3Headers = s3Headers, chunkingParallelism = 1 ) diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupClientSpec.scala index 124c61ab..f4c12ef2 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupClientSpec.scala @@ -57,7 +57,7 @@ class MockedKafkaClientBackupClientSpec implicit val config: S3Config = s3Config implicit val backupConfig: Backup = - Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds) + Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds, None) val backupClient = new BackupClient(Some(s3Settings))(new MockedKafkaClientInterface(Source(data)), diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala index 6fb388dd..4d8f32bd 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala @@ -21,9 +21,11 @@ class MockedS3BackupClientInterface( s3Config: S3Config, maybeS3Settings: Option[S3Settings] )(implicit val s3Headers: S3Headers, system: ActorSystem) - extends BackupClient(maybeS3Settings)( + extends BackupClient( + maybeS3Settings + )( new MockedKafkaClientInterface(kafkaData), - Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds), + Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds, None), implicitly, s3Config, implicitly diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala index 6d5eeb2c..9c2526b0 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala @@ -1,752 +1,9 @@ package io.aiven.guardian.kafka.backup.s3 import akka.actor.ActorSystem -import akka.kafka.scaladsl.Producer -import akka.stream.KillSwitches -import akka.stream.SharedKillSwitch -import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Sink -import com.softwaremill.diffx.scalatest.DiffMustMatcher._ import io.aiven.guardian.akka.AnyPropTestKit -import io.aiven.guardian.kafka.Generators._ -import io.aiven.guardian.kafka.KafkaClusterTest -import io.aiven.guardian.kafka.TestUtils._ -import io.aiven.guardian.kafka.Utils -import io.aiven.guardian.kafka.backup.BackupClientControlWrapper -import io.aiven.guardian.kafka.backup.KafkaClient -import io.aiven.guardian.kafka.backup.configs.Backup -import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice -import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst -import io.aiven.guardian.kafka.codecs.Circe._ -import io.aiven.guardian.kafka.configs.KafkaCluster -import io.aiven.guardian.kafka.models.ReducedConsumerRecord -import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen -import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} -import org.mdedetrich.akka.stream.support.CirceStreamSupport - -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.language.postfixOps - -import java.time.temporal.ChronoUnit - -class RealS3BackupClientSpec - extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) - with KafkaClusterTest - with BackupClientSpec { - override lazy val s3Settings: S3Settings = S3Settings() - - /** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail - * on bucket creation - */ - override lazy val useVirtualDotHost: Boolean = false - override lazy val bucketPrefix: Option[String] = Some("guardian-") - override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds) - - def createKafkaClient( - killSwitch: SharedKillSwitch - )(implicit kafkaClusterConfig: KafkaCluster, backupConfig: Backup): KafkaClientWithKillSwitch = - new KafkaClientWithKillSwitch( - configureConsumer = baseKafkaConfig, - killSwitch = killSwitch - ) - - def getKeyFromSingleDownload(dataBucket: String): Future[String] = waitForS3Download( - dataBucket, - { - case Seq(single) => single.key - case rest => - throw DownloadNotReady(rest) - } - ) - - def getKeysFromTwoDownloads(dataBucket: String): Future[(String, String)] = waitForS3Download( - dataBucket, - { - case Seq(first, second) => (first.key, second.key) - case rest => - throw DownloadNotReady(rest) - } - ) - - def waitUntilBackupClientHasCommitted(backupClient: BackupClientChunkState[_], - step: FiniteDuration = 100 millis, - delay: FiniteDuration = 5 seconds - ): Future[Unit] = - if (backupClient.processedChunks.size() > 0) - akka.pattern.after(delay)(Future.successful(())) - else - akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay)) - - property("basic flow without interruptions using PeriodFromFirst works correctly", RealS3Available) { - forAll(kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), - s3ConfigGen(useVirtualDotHost, bucketPrefix), - kafkaConsumerGroupGen - ) { - (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, - s3Config: S3Config, - kafkaConsumerGroup: String - ) => - logger.info(s"Data bucket is ${s3Config.dataBucket}") - - val data = kafkaDataInChunksWithTimePeriod.data.flatten - - val topics = data.map(_.topic).toSet - - val asProducerRecords = toProducerRecords(data) - val baseSource = toSource(asProducerRecords, 30 seconds) - - implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) - - implicit val config: S3Config = s3Config - implicit val backupConfig: Backup = - Backup(kafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds) - - val producerSettings = createProducer() - - val backupClientWrapped = - new BackupClientControlWrapper( - new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig), - implicitly, - implicitly, - implicitly, - implicitly - ) - ) - - val calculatedFuture = for { - _ <- createTopics(topics) - _ <- createBucket(s3Config.dataBucket) - _ = backupClientWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( - baseSource - .runWith(Producer.plainSink(producerSettings)) - ) - _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) - key <- getKeyFromSingleDownload(s3Config.dataBucket) - downloaded <- - S3.getObject(s3Config.dataBucket, key) - .withAttributes(s3Attrs) - .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) - .runWith(Sink.seq) - } yield downloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => - reducedConsumerRecord - } - - calculatedFuture.onComplete { _ => - cleanTopics(topics) - backupClientWrapped.shutdown() - } - val downloaded = calculatedFuture.futureValue - - val downloadedGroupedAsKey = downloaded - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val inputAsKey = data - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - downloadedGroupedAsKey mustMatchTo inputAsKey - } - } - - property("suspend/resume using PeriodFromFirst creates separate object after resume point", RealS3Available) { - forAll(kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), - s3ConfigGen(useVirtualDotHost, bucketPrefix), - kafkaConsumerGroupGen - ) { - (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, - s3Config: S3Config, - kafkaConsumerGroup: String - ) => - logger.info(s"Data bucket is ${s3Config.dataBucket}") - - val data = kafkaDataInChunksWithTimePeriod.data.flatten - - val topics = data.map(_.topic).toSet - - implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) - - implicit val config: S3Config = s3Config - implicit val backupConfig: Backup = - Backup(kafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds) - - val producerSettings = createProducer() - - val killSwitch = KillSwitches.shared("kill-switch") - - val backupClient = - new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch), - implicitly, - implicitly, - implicitly, - implicitly - ) - val backupClientWrapped = new BackupClientControlWrapper(backupClient) - - val secondBackupClient = new BackupClient(Some(s3Settings))( - new KafkaClient(configureConsumer = baseKafkaConfig), - implicitly, - implicitly, - implicitly, - implicitly - ) - val secondBackupClientWrapped = new BackupClientControlWrapper(secondBackupClient) - - val asProducerRecords = toProducerRecords(data) - val baseSource = toSource(asProducerRecords, 30 seconds) - - val calculatedFuture = for { - _ <- createTopics(topics) - _ <- createBucket(s3Config.dataBucket) - _ = backupClientWrapped.run() - _ = baseSource.runWith(Producer.plainSink(producerSettings)) - _ <- waitUntilBackupClientHasCommitted(backupClient) - _ = killSwitch.abort(TerminationException) - _ <- akka.pattern.after(2 seconds) { - Future { - secondBackupClientWrapped.run() - } - } - _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) - (firstKey, secondKey) <- getKeysFromTwoDownloads(s3Config.dataBucket) - firstDownloaded <- S3.getObject(s3Config.dataBucket, firstKey) - .withAttributes(s3Attrs) - .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) - .runWith(Sink.seq) - secondDownloaded <- S3.getObject(s3Config.dataBucket, secondKey) - .withAttributes(s3Attrs) - .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) - .runWith(Sink.seq) - } yield { - val first = firstDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => - reducedConsumerRecord - } - - val second = secondDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => - reducedConsumerRecord - } - (first, second) - } - - calculatedFuture.onComplete { _ => - cleanTopics(topics) - backupClientWrapped.shutdown() - secondBackupClientWrapped.shutdown() - } - val (firstDownloaded, secondDownloaded) = calculatedFuture.futureValue - - // Only care about ordering when it comes to key - val firstDownloadedGroupedAsKey = firstDownloaded - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val secondDownloadedGroupedAsKey = secondDownloaded - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val inputAsKey = data - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val downloaded = (firstDownloadedGroupedAsKey.keySet ++ secondDownloadedGroupedAsKey.keySet).map { key => - (key, - firstDownloadedGroupedAsKey.getOrElse(key, List.empty) ++ secondDownloadedGroupedAsKey.getOrElse(key, - List.empty - ) - ) - }.toMap - - downloaded mustMatchTo inputAsKey - - } - } - - property("suspend/resume for same object using ChronoUnitSlice works correctly", RealS3Available) { - forAll(kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), - s3ConfigGen(useVirtualDotHost, bucketPrefix), - kafkaConsumerGroupGen - ) { - (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, - s3Config: S3Config, - kafkaConsumerGroup: String - ) => - logger.info(s"Data bucket is ${s3Config.dataBucket}") - - val data = kafkaDataInChunksWithTimePeriod.data.flatten - - val topics = data.map(_.topic).toSet - - implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) - - implicit val config: S3Config = s3Config - implicit val backupConfig: Backup = - Backup(kafkaConsumerGroup, ChronoUnitSlice(ChronoUnit.MINUTES), 10 seconds) - - val producerSettings = createProducer() - - val killSwitch = KillSwitches.shared("kill-switch") - - val backupClient = - new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch), - implicitly, - implicitly, - implicitly, - implicitly - ) - val backupClientWrapped = new BackupClientControlWrapper(backupClient) - - val secondBackupClient = - new BackupClient(Some(s3Settings))( - new KafkaClient(configureConsumer = baseKafkaConfig), - implicitly, - implicitly, - implicitly, - implicitly - ) - val secondBackupClientWrapped = new BackupClientControlWrapper(secondBackupClient) - - val asProducerRecords = toProducerRecords(data) - val baseSource = toSource(asProducerRecords, 30 seconds) - - val calculatedFuture = for { - _ <- createTopics(topics) - _ <- createBucket(s3Config.dataBucket) - _ = backupClientWrapped.run() - _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) - _ = baseSource.runWith(Producer.plainSink(producerSettings)) - _ <- waitUntilBackupClientHasCommitted(backupClient) - _ = killSwitch.abort(TerminationException) - _ <- akka.pattern.after(2 seconds) { - Future { - secondBackupClientWrapped.run() - } - } - _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) - key <- getKeyFromSingleDownload(s3Config.dataBucket) - downloaded <- S3.getObject(s3Config.dataBucket, key) - .withAttributes(s3Attrs) - .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) - .runWith(Sink.seq) - } yield downloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => - reducedConsumerRecord - } - - calculatedFuture.onComplete { _ => - cleanTopics(topics) - backupClientWrapped.shutdown() - secondBackupClientWrapped.shutdown() - } - val downloaded = calculatedFuture.futureValue - - // Only care about ordering when it comes to key - val downloadedGroupedAsKey = downloaded - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val inputAsKey = data - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - downloadedGroupedAsKey mustMatchTo inputAsKey - } - } - - property( - "Backup works with multiple keys", - RealS3Available - ) { - forAll(kafkaDataWithTimePeriodsGen(min = 30000, max = 30000), - s3ConfigGen(useVirtualDotHost, bucketPrefix), - kafkaConsumerGroupGen - ) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config, kafkaConsumerGroup: String) => - logger.info(s"Data bucket is ${s3Config.dataBucket}") - val data = kafkaDataWithTimePeriod.data - - val topics = data.map(_.topic).toSet - - val asProducerRecords = toProducerRecords(data) - val baseSource = toSource(asProducerRecords, 30 seconds) - - implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) - - val producerSettings = createProducer() - - implicit val config: S3Config = s3Config - - implicit val backupConfig: Backup = - Backup(kafkaConsumerGroup, PeriodFromFirst(1 second), 10 seconds) - - val backupClientWrapped = - new BackupClientControlWrapper( - new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig), - implicitly, - implicitly, - implicitly, - implicitly - ) - ) - - val calculatedFuture = for { - _ <- createTopics(topics) - _ <- createBucket(s3Config.dataBucket) - _ = backupClientWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( - baseSource - .runWith(Producer.plainSink(producerSettings)) - ) - - _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) - bucketContents <- akka.pattern.after(10 seconds)( - S3.listBucket(s3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq) - ) - keysSorted = bucketContents.map(_.key).sortBy(Utils.keyToOffsetDateTime) - downloaded <- - Future - .sequence(keysSorted.map { key => - S3.getObject(s3Config.dataBucket, key) - .withAttributes(s3Attrs) - .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) - .runWith(Sink.seq) - }) - .map(_.flatten)(ExecutionContext.parasitic) - } yield downloaded.flatten.collect { case Some(reducedConsumerRecord) => - reducedConsumerRecord - } - - calculatedFuture.onComplete { _ => - cleanTopics(topics) - backupClientWrapped.shutdown() - } - val downloaded = calculatedFuture.futureValue - - // Only care about ordering when it comes to key - val downloadedGroupedAsKey = downloaded - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val inputAsKey = data - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - downloadedGroupedAsKey mustMatchTo inputAsKey - } - } - - property( - "Concurrent backups using real Kafka cluster with a single key", - RealS3Available - ) { - forAll( - kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), - s3ConfigGen(useVirtualDotHost, bucketPrefix), - s3ConfigGen(useVirtualDotHost, bucketPrefix), - kafkaConsumerGroupGen, - kafkaConsumerGroupGen - ) { - (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, - firstS3Config: S3Config, - secondS3Config: S3Config, - firstKafkaConsumerGroup: String, - secondKafkaConsumerGroup: String - ) => - whenever( - firstS3Config.dataBucket != secondS3Config.dataBucket && firstKafkaConsumerGroup != secondKafkaConsumerGroup - ) { - logger.info(s"Data bucket are ${firstS3Config.dataBucket} and ${secondS3Config.dataBucket}") - - val data = kafkaDataInChunksWithTimePeriod.data.flatten - - val topics = data.map(_.topic).toSet - - val asProducerRecords = toProducerRecords(data) - val baseSource = toSource(asProducerRecords, 30 seconds) - - implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) - - val producerSettings = createProducer() - - val backupClientOneWrapped = { - implicit val backupConfig: Backup = Backup(firstKafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds) - - new BackupClientControlWrapper( - new BackupClient(Some(s3Settings))( - new KafkaClient(configureConsumer = baseKafkaConfig), - implicitly, - implicitly, - firstS3Config, - implicitly - ) - ) - } - - val backupClientTwoWrapped = { - implicit val backupConfig: Backup = Backup(secondKafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds) - - new BackupClientControlWrapper( - new BackupClient(Some(s3Settings))( - new KafkaClient(configureConsumer = baseKafkaConfig), - implicitly, - implicitly, - secondS3Config, - implicitly - ) - ) - } - - val calculatedFuture = for { - _ <- createTopics(topics) - _ <- createBucket(firstS3Config.dataBucket) - _ <- createBucket(secondS3Config.dataBucket) - _ = backupClientOneWrapped.run() - _ = backupClientTwoWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( - baseSource - .runWith(Producer.plainSink(producerSettings)) - ) - _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) - keyOne <- getKeyFromSingleDownload(firstS3Config.dataBucket) - keyTwo <- getKeyFromSingleDownload(secondS3Config.dataBucket) - downloadedOne <- - S3.getObject(firstS3Config.dataBucket, keyOne) - .withAttributes(s3Attrs) - .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) - .runWith(Sink.seq) - downloadedTwo <- - S3.getObject(secondS3Config.dataBucket, keyTwo) - .withAttributes(s3Attrs) - .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) - .runWith(Sink.seq) - } yield (downloadedOne.toList.flatten.collect { case Some(reducedConsumerRecord) => - reducedConsumerRecord - }, - downloadedTwo.toList.flatten.collect { case Some(reducedConsumerRecord) => - reducedConsumerRecord - } - ) - - calculatedFuture.onComplete { _ => - cleanTopics(topics) - backupClientOneWrapped.shutdown() - backupClientTwoWrapped.shutdown() - } - val (downloadedOne, downloadedTwo) = calculatedFuture.futureValue - - val downloadedOneGroupedAsKey = downloadedOne - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val downloadedTwoGroupedAsKey = downloadedTwo - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val inputAsKey = data - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - downloadedOneGroupedAsKey mustMatchTo inputAsKey - downloadedTwoGroupedAsKey mustMatchTo inputAsKey - } - } - } - - property( - "Concurrent backups using real Kafka cluster with a multiple keys", - RealS3Available - ) { - forAll( - kafkaDataWithTimePeriodsGen(min = 30000, max = 30000), - s3ConfigGen(useVirtualDotHost, bucketPrefix), - s3ConfigGen(useVirtualDotHost, bucketPrefix), - kafkaConsumerGroupGen, - kafkaConsumerGroupGen - ) { - (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, - firstS3Config: S3Config, - secondS3Config: S3Config, - firstKafkaConsumerGroup: String, - secondKafkaConsumerGroup: String - ) => - whenever( - firstS3Config.dataBucket != secondS3Config.dataBucket && firstKafkaConsumerGroup != secondKafkaConsumerGroup - ) { - logger.info(s"Data bucket are ${firstS3Config.dataBucket} and ${secondS3Config.dataBucket}") - - val data = kafkaDataWithTimePeriod.data - - val topics = data.map(_.topic).toSet - - val asProducerRecords = toProducerRecords(data) - val baseSource = toSource(asProducerRecords, 30 seconds) - - implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) - - val producerSettings = createProducer() - - val backupClientOneWrapped = { - implicit val backupConfig: Backup = Backup(firstKafkaConsumerGroup, PeriodFromFirst(1 second), 10 seconds) - - new BackupClientControlWrapper( - new BackupClient(Some(s3Settings))( - new KafkaClient(configureConsumer = baseKafkaConfig), - implicitly, - implicitly, - firstS3Config, - implicitly - ) - ) - } - - val backupClientTwoWrapped = { - implicit val backupConfig: Backup = Backup(secondKafkaConsumerGroup, PeriodFromFirst(1 second), 10 seconds) - - new BackupClientControlWrapper( - new BackupClient(Some(s3Settings))( - new KafkaClient(configureConsumer = baseKafkaConfig), - implicitly, - implicitly, - secondS3Config, - implicitly - ) - ) - } - - val calculatedFuture = for { - _ <- createTopics(topics) - _ <- createBucket(firstS3Config.dataBucket) - _ <- createBucket(secondS3Config.dataBucket) - _ = backupClientOneWrapped.run() - _ = backupClientTwoWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( - baseSource - .runWith(Producer.plainSink(producerSettings)) - ) - - _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) - (bucketContentsOne, bucketContentsTwo) <- - akka.pattern.after(10 seconds)(for { - bucketContentsOne <- - S3.listBucket(firstS3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq) - bucketContentsTwo <- - S3.listBucket(secondS3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq) - - } yield (bucketContentsOne, bucketContentsTwo)) - keysSortedOne = bucketContentsOne.map(_.key).sortBy(Utils.keyToOffsetDateTime) - keysSortedTwo = bucketContentsTwo.map(_.key).sortBy(Utils.keyToOffsetDateTime) - downloadedOne <- - Future - .sequence(keysSortedOne.map { key => - S3.getObject(firstS3Config.dataBucket, key) - .withAttributes(s3Attrs) - .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) - .runWith(Sink.seq) - }) - .map(_.flatten)(ExecutionContext.parasitic) - - downloadedTwo <- - Future - .sequence(keysSortedTwo.map { key => - S3.getObject(secondS3Config.dataBucket, key) - .withAttributes(s3Attrs) - .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) - .runWith(Sink.seq) - }) - .map(_.flatten)(ExecutionContext.parasitic) - } yield (downloadedOne.flatten.collect { case Some(reducedConsumerRecord) => - reducedConsumerRecord - }, - downloadedTwo.flatten.collect { case Some(reducedConsumerRecord) => - reducedConsumerRecord - } - ) - - calculatedFuture.onComplete { _ => - cleanTopics(topics) - backupClientOneWrapped.shutdown() - backupClientTwoWrapped.shutdown() - } - val (downloadedOne, downloadedTwo) = calculatedFuture.futureValue - - // Only care about ordering when it comes to key - val downloadedGroupedAsKeyOne = downloadedOne - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val downloadedGroupedAsKeyTwo = downloadedTwo - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val inputAsKey = data - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - downloadedGroupedAsKeyOne mustMatchTo inputAsKey - downloadedGroupedAsKeyTwo mustMatchTo inputAsKey - } - } - } +import io.aiven.guardian.kafka.backup.configs.Compression +class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) with RealS3BackupClientTest { + override val compression: Option[Compression] = None } diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientTest.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientTest.scala new file mode 100644 index 00000000..b2a2adf8 --- /dev/null +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientTest.scala @@ -0,0 +1,736 @@ +package io.aiven.guardian.kafka.backup.s3 + +import akka.kafka.scaladsl.Producer +import akka.stream.KillSwitches +import akka.stream.SharedKillSwitch +import akka.stream.alpakka.s3.S3Settings +import akka.stream.alpakka.s3.scaladsl.S3 +import akka.stream.scaladsl.Compression +import akka.stream.scaladsl.Sink +import com.softwaremill.diffx.scalatest.DiffMustMatcher._ +import io.aiven.guardian.kafka.Generators._ +import io.aiven.guardian.kafka.KafkaClusterTest +import io.aiven.guardian.kafka.TestUtils._ +import io.aiven.guardian.kafka.Utils +import io.aiven.guardian.kafka.backup.BackupClientControlWrapper +import io.aiven.guardian.kafka.backup.KafkaClient +import io.aiven.guardian.kafka.backup.configs.Backup +import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice +import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst +import io.aiven.guardian.kafka.backup.configs.{Compression => CompressionConfig} +import io.aiven.guardian.kafka.codecs.Circe._ +import io.aiven.guardian.kafka.configs.KafkaCluster +import io.aiven.guardian.kafka.models.Gzip +import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen +import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} +import org.mdedetrich.akka.stream.support.CirceStreamSupport +import org.scalatest.propspec.AnyPropSpecLike + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps + +import java.time.temporal.ChronoUnit + +trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with BackupClientSpec { + def compression: Option[CompressionConfig] + + override lazy val s3Settings: S3Settings = S3Settings() + + /** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail + * on bucket creation + */ + override lazy val useVirtualDotHost: Boolean = false + override lazy val bucketPrefix: Option[String] = Some("guardian-") + override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds) + + def createKafkaClient( + killSwitch: SharedKillSwitch + )(implicit kafkaClusterConfig: KafkaCluster, backupConfig: Backup): KafkaClientWithKillSwitch = + new KafkaClientWithKillSwitch( + configureConsumer = baseKafkaConfig, + killSwitch = killSwitch + ) + + def getKeyFromSingleDownload(dataBucket: String): Future[String] = waitForS3Download( + dataBucket, + { + case Seq(single) => single.key + case rest => + throw DownloadNotReady(rest) + } + ) + + def getKeysFromTwoDownloads(dataBucket: String): Future[(String, String)] = waitForS3Download( + dataBucket, + { + case Seq(first, second) => (first.key, second.key) + case rest => + throw DownloadNotReady(rest) + } + ) + + def waitUntilBackupClientHasCommitted(backupClient: BackupClientChunkState[_], + step: FiniteDuration = 100 millis, + delay: FiniteDuration = 5 seconds + ): Future[Unit] = + if (backupClient.processedChunks.size() > 0) + akka.pattern.after(delay)(Future.successful(())) + else + akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay)) + + private def downloadObject(dataBucket: String, key: String) = { + val downloadSource = S3 + .getObject(dataBucket, key) + .withAttributes(s3Attrs) + + val decode = CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]] + (compression match { + case Some(CompressionConfig(Gzip, _)) => downloadSource.via(Compression.gunzip()).via(decode) + case None => downloadSource.via(decode) + }).runWith(Sink.seq) + } + + property("basic flow without interruptions using PeriodFromFirst works correctly", RealS3Available) { + forAll(kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), + s3ConfigGen(useVirtualDotHost, bucketPrefix), + kafkaConsumerGroupGen + ) { + (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, + s3Config: S3Config, + kafkaConsumerGroup: String + ) => + logger.info(s"Data bucket is ${s3Config.dataBucket}") + + val data = kafkaDataInChunksWithTimePeriod.data.flatten + + val topics = data.map(_.topic).toSet + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) + + implicit val config: S3Config = s3Config + implicit val backupConfig: Backup = + Backup(kafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds, compression) + + val producerSettings = createProducer() + + val backupClientWrapped = + new BackupClientControlWrapper( + new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + implicitly, + implicitly + ) + ) + + val calculatedFuture = for { + _ <- createTopics(topics) + _ <- createBucket(s3Config.dataBucket) + _ = backupClientWrapped.run() + _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + baseSource + .runWith(Producer.plainSink(producerSettings)) + ) + _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) + key <- getKeyFromSingleDownload(s3Config.dataBucket) + downloaded <- downloadObject(s3Config.dataBucket, key) + } yield downloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + + calculatedFuture.onComplete { _ => + cleanTopics(topics) + backupClientWrapped.shutdown() + } + val downloaded = calculatedFuture.futureValue + + val downloadedGroupedAsKey = downloaded + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + downloadedGroupedAsKey mustMatchTo inputAsKey + } + } + + property("suspend/resume using PeriodFromFirst creates separate object after resume point", RealS3Available) { + forAll(kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), + s3ConfigGen(useVirtualDotHost, bucketPrefix), + kafkaConsumerGroupGen + ) { + (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, + s3Config: S3Config, + kafkaConsumerGroup: String + ) => + logger.info(s"Data bucket is ${s3Config.dataBucket}") + + val data = kafkaDataInChunksWithTimePeriod.data.flatten + + val topics = data.map(_.topic).toSet + + implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) + + implicit val config: S3Config = s3Config + implicit val backupConfig: Backup = + Backup(kafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds, compression) + + val producerSettings = createProducer() + + val killSwitch = KillSwitches.shared("kill-switch") + + val backupClient = + new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch), + implicitly, + implicitly, + implicitly, + implicitly + ) + val backupClientWrapped = new BackupClientControlWrapper(backupClient) + + val secondBackupClient = new BackupClient(Some(s3Settings))( + new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + implicitly, + implicitly + ) + val secondBackupClientWrapped = new BackupClientControlWrapper(secondBackupClient) + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + val calculatedFuture = for { + _ <- createTopics(topics) + _ <- createBucket(s3Config.dataBucket) + _ = backupClientWrapped.run() + _ = baseSource.runWith(Producer.plainSink(producerSettings)) + _ <- waitUntilBackupClientHasCommitted(backupClient) + _ = killSwitch.abort(TerminationException) + _ <- akka.pattern.after(2 seconds) { + Future { + secondBackupClientWrapped.run() + } + } + _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) + (firstKey, secondKey) <- getKeysFromTwoDownloads(s3Config.dataBucket) + firstDownloadedFuture = downloadObject(s3Config.dataBucket, firstKey) + secondDownloadedFuture = downloadObject(s3Config.dataBucket, secondKey) + firstDownloaded <- firstDownloadedFuture + secondDownloaded <- secondDownloadedFuture + } yield { + val first = firstDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + + val second = secondDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + (first, second) + } + + calculatedFuture.onComplete { _ => + cleanTopics(topics) + backupClientWrapped.shutdown() + secondBackupClientWrapped.shutdown() + } + val (firstDownloaded, secondDownloaded) = calculatedFuture.futureValue + + // Only care about ordering when it comes to key + val firstDownloadedGroupedAsKey = firstDownloaded + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val secondDownloadedGroupedAsKey = secondDownloaded + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val downloaded = (firstDownloadedGroupedAsKey.keySet ++ secondDownloadedGroupedAsKey.keySet).map { key => + (key, + firstDownloadedGroupedAsKey.getOrElse(key, List.empty) ++ secondDownloadedGroupedAsKey.getOrElse(key, + List.empty + ) + ) + }.toMap + + downloaded mustMatchTo inputAsKey + } + } + + property("suspend/resume for same object using ChronoUnitSlice works correctly", RealS3Available) { + forAll(kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), + s3ConfigGen(useVirtualDotHost, bucketPrefix), + kafkaConsumerGroupGen + ) { + (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, + s3Config: S3Config, + kafkaConsumerGroup: String + ) => + logger.info(s"Data bucket is ${s3Config.dataBucket}") + + val data = kafkaDataInChunksWithTimePeriod.data.flatten + + val topics = data.map(_.topic).toSet + + implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) + + implicit val config: S3Config = s3Config + implicit val backupConfig: Backup = + Backup(kafkaConsumerGroup, ChronoUnitSlice(ChronoUnit.MINUTES), 10 seconds, compression) + + val producerSettings = createProducer() + + val killSwitch = KillSwitches.shared("kill-switch") + + val backupClient = + new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch), + implicitly, + implicitly, + implicitly, + implicitly + ) + val backupClientWrapped = new BackupClientControlWrapper(backupClient) + + val secondBackupClient = + new BackupClient(Some(s3Settings))( + new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + implicitly, + implicitly + ) + val secondBackupClientWrapped = new BackupClientControlWrapper(secondBackupClient) + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + val calculatedFuture = for { + _ <- createTopics(topics) + _ <- createBucket(s3Config.dataBucket) + _ = backupClientWrapped.run() + _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) + _ = baseSource.runWith(Producer.plainSink(producerSettings)) + _ <- waitUntilBackupClientHasCommitted(backupClient) + _ = killSwitch.abort(TerminationException) + _ <- akka.pattern.after(2 seconds) { + Future { + secondBackupClientWrapped.run() + } + } + _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) + key <- getKeyFromSingleDownload(s3Config.dataBucket) + downloaded <- downloadObject(s3Config.dataBucket, key) + } yield downloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + + calculatedFuture.onComplete { _ => + cleanTopics(topics) + backupClientWrapped.shutdown() + secondBackupClientWrapped.shutdown() + } + val downloaded = calculatedFuture.futureValue + + // Only care about ordering when it comes to key + val downloadedGroupedAsKey = downloaded + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + downloadedGroupedAsKey mustMatchTo inputAsKey + } + } + + property( + "Backup works with multiple keys", + RealS3Available + ) { + forAll(kafkaDataWithTimePeriodsGen(min = 30000, max = 30000), + s3ConfigGen(useVirtualDotHost, bucketPrefix), + kafkaConsumerGroupGen + ) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config, kafkaConsumerGroup: String) => + logger.info(s"Data bucket is ${s3Config.dataBucket}") + val data = kafkaDataWithTimePeriod.data + + val topics = data.map(_.topic).toSet + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) + + val producerSettings = createProducer() + + implicit val config: S3Config = s3Config + + implicit val backupConfig: Backup = + Backup(kafkaConsumerGroup, PeriodFromFirst(1 second), 10 seconds, compression) + val backupClientWrapped = + new BackupClientControlWrapper( + new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + implicitly, + implicitly + ) + ) + + val calculatedFuture = for { + _ <- createTopics(topics) + _ <- createBucket(s3Config.dataBucket) + _ = backupClientWrapped.run() + _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + baseSource + .runWith(Producer.plainSink(producerSettings)) + ) + + _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) + bucketContents <- akka.pattern.after(10 seconds)( + S3.listBucket(s3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq) + ) + keysSorted = bucketContents.map(_.key).sortBy(Utils.keyToOffsetDateTime) + downloaded <- + Future + .sequence(keysSorted.map(key => downloadObject(s3Config.dataBucket, key))) + .map(_.flatten) + } yield downloaded.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + + calculatedFuture.onComplete { _ => + cleanTopics(topics) + backupClientWrapped.shutdown() + } + val downloaded = calculatedFuture.futureValue + + // Only care about ordering when it comes to key + val downloadedGroupedAsKey = downloaded + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + downloadedGroupedAsKey mustMatchTo inputAsKey + } + } + + property( + "Concurrent backups using real Kafka cluster with a single key", + RealS3Available + ) { + forAll( + kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), + s3ConfigGen(useVirtualDotHost, bucketPrefix), + s3ConfigGen(useVirtualDotHost, bucketPrefix), + kafkaConsumerGroupGen, + kafkaConsumerGroupGen + ) { + (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, + firstS3Config: S3Config, + secondS3Config: S3Config, + firstKafkaConsumerGroup: String, + secondKafkaConsumerGroup: String + ) => + whenever( + firstS3Config.dataBucket != secondS3Config.dataBucket && firstKafkaConsumerGroup != secondKafkaConsumerGroup + ) { + logger.info(s"Data bucket are ${firstS3Config.dataBucket} and ${secondS3Config.dataBucket}") + + val data = kafkaDataInChunksWithTimePeriod.data.flatten + + val topics = data.map(_.topic).toSet + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) + + val producerSettings = createProducer() + + val backupClientOneWrapped = { + implicit val backupConfig: Backup = + Backup(firstKafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds, compression) + + new BackupClientControlWrapper( + new BackupClient(Some(s3Settings))( + new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + firstS3Config, + implicitly + ) + ) + } + + val backupClientTwoWrapped = { + implicit val backupConfig: Backup = + Backup(secondKafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds, compression) + + new BackupClientControlWrapper( + new BackupClient(Some(s3Settings))( + new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + secondS3Config, + implicitly + ) + ) + } + + val calculatedFuture = for { + _ <- createTopics(topics) + _ <- createBucket(firstS3Config.dataBucket) + _ <- createBucket(secondS3Config.dataBucket) + _ = backupClientOneWrapped.run() + _ = backupClientTwoWrapped.run() + _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + baseSource + .runWith(Producer.plainSink(producerSettings)) + ) + _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) + keyOne <- getKeyFromSingleDownload(firstS3Config.dataBucket) + keyTwo <- getKeyFromSingleDownload(secondS3Config.dataBucket) + downloadedOneFuture = downloadObject(firstS3Config.dataBucket, keyOne) + downloadedTwoFuture = downloadObject(secondS3Config.dataBucket, keyTwo) + downloadedOne <- downloadedOneFuture + downloadedTwo <- downloadedTwoFuture + } yield (downloadedOne.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + }, + downloadedTwo.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + ) + + calculatedFuture.onComplete { _ => + cleanTopics(topics) + backupClientOneWrapped.shutdown() + backupClientTwoWrapped.shutdown() + } + val (downloadedOne, downloadedTwo) = calculatedFuture.futureValue + + val downloadedOneGroupedAsKey = downloadedOne + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val downloadedTwoGroupedAsKey = downloadedTwo + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + downloadedOneGroupedAsKey mustMatchTo inputAsKey + downloadedTwoGroupedAsKey mustMatchTo inputAsKey + } + } + } + + property( + "Concurrent backups using real Kafka cluster with a multiple keys", + RealS3Available + ) { + forAll( + kafkaDataWithTimePeriodsGen(min = 30000, max = 30000), + s3ConfigGen(useVirtualDotHost, bucketPrefix), + s3ConfigGen(useVirtualDotHost, bucketPrefix), + kafkaConsumerGroupGen, + kafkaConsumerGroupGen + ) { + (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, + firstS3Config: S3Config, + secondS3Config: S3Config, + firstKafkaConsumerGroup: String, + secondKafkaConsumerGroup: String + ) => + whenever( + firstS3Config.dataBucket != secondS3Config.dataBucket && firstKafkaConsumerGroup != secondKafkaConsumerGroup + ) { + logger.info(s"Data bucket are ${firstS3Config.dataBucket} and ${secondS3Config.dataBucket}") + + val data = kafkaDataWithTimePeriod.data + + val topics = data.map(_.topic).toSet + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) + + val producerSettings = createProducer() + + val backupClientOneWrapped = { + implicit val backupConfig: Backup = + Backup(firstKafkaConsumerGroup, PeriodFromFirst(1 second), 10 seconds, compression) + + new BackupClientControlWrapper( + new BackupClient(Some(s3Settings))( + new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + firstS3Config, + implicitly + ) + ) + } + + val backupClientTwoWrapped = { + implicit val backupConfig: Backup = + Backup(secondKafkaConsumerGroup, PeriodFromFirst(1 second), 10 seconds, compression) + + new BackupClientControlWrapper( + new BackupClient(Some(s3Settings))( + new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + secondS3Config, + implicitly + ) + ) + } + + val calculatedFuture = for { + _ <- createTopics(topics) + _ <- createBucket(firstS3Config.dataBucket) + _ <- createBucket(secondS3Config.dataBucket) + _ = backupClientOneWrapped.run() + _ = backupClientTwoWrapped.run() + _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + baseSource + .runWith(Producer.plainSink(producerSettings)) + ) + + _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) + (bucketContentsOne, bucketContentsTwo) <- + akka.pattern.after(10 seconds)(for { + bucketContentsOne <- + S3.listBucket(firstS3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq) + bucketContentsTwo <- + S3.listBucket(secondS3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq) + + } yield (bucketContentsOne, bucketContentsTwo)) + keysSortedOne = bucketContentsOne.map(_.key).sortBy(Utils.keyToOffsetDateTime) + keysSortedTwo = bucketContentsTwo.map(_.key).sortBy(Utils.keyToOffsetDateTime) + downloadedOneFuture = + Future + .sequence(keysSortedOne.map(key => downloadObject(firstS3Config.dataBucket, key))) + .map(_.flatten) + + downloadedTwoFuture = + Future + .sequence(keysSortedTwo.map(key => downloadObject(secondS3Config.dataBucket, key))) + .map(_.flatten) + downloadedOne <- downloadedOneFuture + downloadedTwo <- downloadedTwoFuture + } yield (downloadedOne.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + }, + downloadedTwo.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + ) + + calculatedFuture.onComplete { _ => + cleanTopics(topics) + backupClientOneWrapped.shutdown() + backupClientTwoWrapped.shutdown() + } + val (downloadedOne, downloadedTwo) = calculatedFuture.futureValue + + // Only care about ordering when it comes to key + val downloadedGroupedAsKeyOne = downloadedOne + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val downloadedGroupedAsKeyTwo = downloadedTwo + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + downloadedGroupedAsKeyOne mustMatchTo inputAsKey + downloadedGroupedAsKeyTwo mustMatchTo inputAsKey + } + } + } + +} diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3GzipCompressionBackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3GzipCompressionBackupClientSpec.scala new file mode 100644 index 00000000..b60c693e --- /dev/null +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3GzipCompressionBackupClientSpec.scala @@ -0,0 +1,12 @@ +package io.aiven.guardian.kafka.backup.s3 + +import akka.actor.ActorSystem +import io.aiven.guardian.akka.AnyPropTestKit +import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.kafka.models.Gzip + +class RealS3GzipCompressionBackupClientSpec + extends AnyPropTestKit(ActorSystem("RealS3GzipCompressionBackupClientSpec")) + with RealS3BackupClientTest { + override val compression: Option[Compression] = Some(Compression(Gzip, None)) +} diff --git a/build.sbt b/build.sbt index d47ff64a..61fcd6c4 100644 --- a/build.sbt +++ b/build.sbt @@ -116,8 +116,8 @@ lazy val core = project librarySettings, name := s"$baseName-core", libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-actor" % akkaVersion % Provided, - "com.typesafe.akka" %% "akka-stream" % akkaVersion % Provided, + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.typesafe.akka" %% "akka-stream-kafka" % alpakkaKafkaVersion, // Ideally we shouldn't be explicitly providing a kafka-clients version and instead getting the version // transitively from akka-streams-kafka however there isn't a nice way to extract a transitive dependency diff --git a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala index 2925c0c8..5e88e420 100644 --- a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala +++ b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala @@ -8,11 +8,9 @@ import io.aiven.guardian.cli.MainUtils import io.aiven.guardian.cli.arguments.PropertiesOpt._ import io.aiven.guardian.cli.arguments.StorageOpt import io.aiven.guardian.cli.options.Options -import io.aiven.guardian.kafka.backup.configs.Backup -import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice -import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst -import io.aiven.guardian.kafka.backup.configs.TimeConfiguration +import io.aiven.guardian.kafka.backup.configs._ import io.aiven.guardian.kafka.configs.KafkaCluster +import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.s3.configs.S3 import org.slf4j.LoggerFactory import pureconfig.ConfigSource @@ -56,13 +54,22 @@ class Entry(val initializedApp: AtomicReference[Option[(App[_], Promise[Unit])]] ) .withDefault(10 seconds) + val compressionLevelOpt = + Opts.option[Int]("compression-level", help = "Level of compression to use if enabled").orNone + + val gzipOpt = Opts.subcommand("gzip", help = "Enable gzip compression") { + compressionLevelOpt.map(level => Compression(Gzip, level)) + } + + val compressionOpt = gzipOpt.orNone + val backupOpt = - (groupIdOpt, timeConfigurationOpt, commitTimeoutBufferOpt).tupled.mapValidated { - case (maybeGroupId, maybeTimeConfiguration, commitTimeoutBuffer) => + (groupIdOpt, timeConfigurationOpt, commitTimeoutBufferOpt, compressionOpt).tupled.mapValidated { + case (maybeGroupId, maybeTimeConfiguration, commitTimeoutBuffer, maybeCompression) => import io.aiven.guardian.kafka.backup.Config.backupConfig (maybeGroupId, maybeTimeConfiguration) match { case (Some(groupId), Some(timeConfiguration)) => - Backup(groupId, timeConfiguration, commitTimeoutBuffer).validNel + Backup(groupId, timeConfiguration, commitTimeoutBuffer, maybeCompression).validNel case _ => Options .optionalPureConfigValue(() => backupConfig) diff --git a/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala b/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala index ca864604..542de7c6 100644 --- a/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala +++ b/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala @@ -4,8 +4,10 @@ import akka.actor.ActorSystem import akka.testkit.TestKit import com.typesafe.scalalogging.StrictLogging import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice +import io.aiven.guardian.kafka.backup.configs.Compression import io.aiven.guardian.kafka.backup.configs.{Backup => BackupConfig} import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig} +import io.aiven.guardian.kafka.models.Gzip import markatta.futiles.CancellableFuture import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.must.Matchers @@ -51,7 +53,10 @@ class CliSpec "--chrono-unit-slice", "hours", "--commit-timeout-buffer-window", - "1 second" + "1 second", + "gzip", + "--compression-level", + "5" ) val cancellable = CancellableFuture { @@ -73,7 +78,8 @@ class CliSpec case s3App: S3App => s3App.backupConfig mustEqual BackupConfig(groupId, ChronoUnitSlice(ChronoUnit.HOURS), - FiniteDuration(1, TimeUnit.SECONDS) + FiniteDuration(1, TimeUnit.SECONDS), + Some(Compression(Gzip, Some(5))) ) s3App.kafkaClusterConfig mustEqual KafkaClusterConfig(Set(topic)) s3App.kafkaClient.consumerSettings.getProperty("bootstrap.servers") mustEqual bootstrapServer diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala index c15b89fc..e821c982 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala @@ -7,11 +7,11 @@ import akka.stream.scaladsl._ import akka.util.ByteString import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.Errors -import io.aiven.guardian.kafka.backup.configs.Backup -import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice -import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst -import io.aiven.guardian.kafka.backup.configs.TimeConfiguration +import io.aiven.guardian.kafka.backup.configs.{Compression => CompressionConfig, _} import io.aiven.guardian.kafka.codecs.Circe._ +import io.aiven.guardian.kafka.models.BackupObjectMetadata +import io.aiven.guardian.kafka.models.CompressionType +import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.circe.syntax._ @@ -51,8 +51,9 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { private[backup] case class Tail(override val context: kafkaClientInterface.CursorContext) extends ByteStringContext - case class PreviousState(state: State, previousKey: String) - case class UploadStateResult(current: Option[State], previous: Option[PreviousState]) + case class StateDetails(state: State, backupObjectMetadata: BackupObjectMetadata) + case class PreviousState(stateDetails: StateDetails, previousKey: String) + case class UploadStateResult(current: Option[StateDetails], previous: Option[PreviousState]) object UploadStateResult { val empty: UploadStateResult = UploadStateResult(None, None) } @@ -296,8 +297,64 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { } } - private[backup] val terminateSource: Source[ByteString, NotUsed] = - Source.single(ByteString("null]")) + private[backup] def compressContextFlow[CtxIn, CtxOut, Mat]( + flowWithContext: FlowWithContext[ByteString, CtxIn, ByteString, CtxOut, Mat] + ) = + backupConfig.compression match { + case Some(compression) => + flowWithContext.unsafeDataVia(compressionFlow(compression)) + case None => flowWithContext + } + + private[backup] def compressContextSink[Ctx, Mat](sink: Sink[(ByteString, Ctx), Mat]) = + backupConfig.compression match { + case Some(compression) => + FlowWithContext[ByteString, Ctx] + .unsafeDataVia(compressionFlow(compression)) + .asFlow + .toMat( + sink + )(Keep.right) + case None => sink + } + + private[backup] def skipCompressionOnAlreadyExistingUpload(start: Start, previousState: PreviousState) = + (backupConfig.compression, previousState.stateDetails.backupObjectMetadata.compression) match { + case (Some(compression), None) => + val whichCompression = compression.`type`.pretty + logger.info( + s"Configured compression $whichCompression will apply on next upload, skipping compressing current upload" + ) + backupToStorageSink(start.key, None) + .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => + (byteString, byteStringContext.context) + } + case (None, None) => + backupToStorageSink(start.key, None) + .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => + (byteString, byteStringContext.context) + } + case (None, Some(_)) => + logger.info(s"Compression has been configured to be disabled, this will apply on next upload") + FlowWithContext[ByteString, ByteStringContext] + // Since we don't persist any details on what the compression level is for a previously + // initiated upload lets just use the default compression level + .unsafeDataVia(compressionFlow(CompressionConfig(Gzip, None))) + .asFlow + .toMat( + backupToStorageSink(start.key, None) + .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => + (byteString, byteStringContext.context) + } + )(Keep.right) + case (Some(CompressionConfig(Gzip, _)), Some(Gzip)) => + compressContextSink( + backupToStorageSink(start.key, None) + .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => + (byteString, byteStringContext.context) + } + ) + } /** Prepares the sink before it gets handed to `backupToStorageSink` */ @@ -308,25 +365,19 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { case (Some(previous), None) => backupConfig.timeConfiguration match { case _: PeriodFromFirst => - backupToStorageSink(start.key, None) - .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => - (byteString, byteStringContext.context) - } + skipCompressionOnAlreadyExistingUpload(start, previous) case _: ChronoUnitSlice => logger.warn( s"Detected previous backup using PeriodFromFirst however current configuration is now changed to ChronoUnitSlice. Object/file with an older key: ${start.key} may contain newer events than object/file with newer key: ${previous.previousKey}" ) - backupToStorageSink(start.key, None) - .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => - (byteString, byteStringContext.context) - } + skipCompressionOnAlreadyExistingUpload(start, previous) } case (None, Some(current)) => backupConfig.timeConfiguration match { case _: PeriodFromFirst => throw Errors.UnhandledStreamCase(List(current)) case _: ChronoUnitSlice => - FlowWithContext + val baseFlow = FlowWithContext .fromTuples( Flow[(ByteString, ByteStringContext)] .flatMapPrefix(1) { @@ -338,19 +389,22 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { case _ => throw Errors.ExpectedStartOfSource } ) - .asFlow - .toMat(backupToStorageSink(start.key, Some(current)).contramap[(ByteString, ByteStringContext)] { + + compressContextFlow(baseFlow).asFlow + .toMat(backupToStorageSink(start.key, Some(current.state)).contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => (byteString, byteStringContext.context) })(Keep.right) } case (None, None) => - backupToStorageSink(start.key, None) - .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => - (byteString, byteStringContext.context) - } + compressContextSink( + backupToStorageSink(start.key, None) + .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => + (byteString, byteStringContext.context) + } + ) case (Some(previous), Some(current)) => - throw Errors.UnhandledStreamCase(List(previous.state, current)) + throw Errors.UnhandledStreamCase(List(previous.stateDetails, current)) } /** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a @@ -374,10 +428,16 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { case (Seq(only: Element, End), _) => // This case only occurs when you have a single element in a timeslice. // We have to terminate immediately to create a JSON array with a single element - val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration) + val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, + backupConfig.timeConfiguration, + backupConfig.compression + ) transformFirstElement(only, key, terminate = true) case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) => - val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration) + val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime, + backupConfig.timeConfiguration, + backupConfig.compression + ) val firstSource = transformFirstElement(first, key, terminate = false) val rest = Source.combine( @@ -402,7 +462,10 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { ) case (Seq(only: Element), _) => // This case can also occur when user terminates the stream - val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration) + val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, + backupConfig.timeConfiguration, + backupConfig.compression + ) transformFirstElement(only, key, terminate = false) case (rest, _) => throw Errors.UnhandledStreamCase(rest) @@ -422,7 +485,7 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { _ = logger.debug(s"Received $uploadStateResult from getCurrentUploadState with key:${start.key}") _ <- (uploadStateResult.previous, uploadStateResult.current) match { case (Some(previous), None) => - terminateSource + terminateSource(previous.stateDetails.backupObjectMetadata.compression) .runWith(backupToStorageTerminateSink(previous)) .map(Some.apply)(ExecutionContext.parasitic) case _ => Future.successful(None) @@ -464,13 +527,21 @@ object BackupClientInterface { * @return * A `String` that can be used either as some object key or a filename */ - def calculateKey(offsetDateTime: OffsetDateTime, timeConfiguration: TimeConfiguration): String = { + def calculateKey(offsetDateTime: OffsetDateTime, + timeConfiguration: TimeConfiguration, + maybeCompression: Option[CompressionConfig] + ): String = { val finalTime = timeConfiguration match { case ChronoUnitSlice(chronoUnit) => offsetDateTime.truncatedTo(chronoUnit) case _ => offsetDateTime } - s"${BackupClientInterface.formatOffsetDateTime(finalTime)}.json" + val extension = maybeCompression match { + case Some(CompressionConfig(Gzip, _)) => "json.gz" + case None => "json" + } + + s"${BackupClientInterface.formatOffsetDateTime(finalTime)}.$extension" } /** Calculates whether we have rolled over a time period given number of divided periods. @@ -503,4 +574,41 @@ object BackupClientInterface { // TODO handle overflow? ChronoUnit.MICROS.between(finalInitialTime, reducedConsumerRecord.toOffsetDateTime) / period.toMicros } + + /** Flattens an existing flow so that for each incoming element there is exactly one outputting element. Typically + * this is used in combination with `unsafeDataVia` so that your `FlowWithContext`/`SourceWithContext` doesn't get + * corrupted. + * @param flow + * An existing flow that you want to flatten + * @param zero + * A zero value which is used if the `flow` doesn't produce any elements + * @param foldFunc + * A function that determines how to concatenate a sequence + * @return + * A flow that will always produce exactly single output element for a given input element + */ + private[backup] def flattenFlow[T](flow: Flow[T, T, NotUsed], zero: T, foldFunc: (T, T) => T): Flow[T, T, NotUsed] = + Flow[T].flatMapConcat { single => + Source + .fromMaterializer { case (mat, _) => + Source.future( + Source.single(single).via(flow).runFold(zero)(foldFunc)(mat) + ) + } + } + + private[backup] def compressionFlow(compression: CompressionConfig) = compression match { + case CompressionConfig(Gzip, Some(level)) => + flattenFlow[ByteString](Compression.gzip(level), ByteString.empty, _ ++ _) + case CompressionConfig(Gzip, None) => + flattenFlow[ByteString](Compression.gzip, ByteString.empty, _ ++ _) + } + + private[backup] def terminateSource(compression: Option[CompressionType]) = { + val baseSource = Source.single(ByteString("null]")) + compression match { + case Some(Gzip) => baseSource.via(Compression.gzip) + case None => baseSource + } + } } diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala index 7564db17..306fa681 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala @@ -8,8 +8,11 @@ import scala.concurrent.duration.FiniteDuration * Determines how the backed up objects/files are segregated depending on a time configuration * @param commitTimeoutBufferWindow * A buffer that is added ontop of the `timeConfiguration` when setting the Kafka Consumer commit timeout. + * @param compression + * Which compression to use for the backed up data */ final case class Backup(kafkaGroupId: String, timeConfiguration: TimeConfiguration, - commitTimeoutBufferWindow: FiniteDuration + commitTimeoutBufferWindow: FiniteDuration, + compression: Option[Compression] ) diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Compression.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Compression.scala new file mode 100644 index 00000000..63c01167 --- /dev/null +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Compression.scala @@ -0,0 +1,5 @@ +package io.aiven.guardian.kafka.backup.configs + +import io.aiven.guardian.kafka.models.CompressionType + +final case class Compression(`type`: CompressionType, level: Option[Int]) diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala index a23ba46c..a555d87a 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala @@ -1,357 +1,11 @@ package io.aiven.guardian.kafka.backup import akka.actor.ActorSystem -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.util.ByteString -import com.softwaremill.diffx.generic.auto._ -import com.softwaremill.diffx.scalatest.DiffMustMatcher._ -import com.typesafe.scalalogging.StrictLogging -import io.aiven.guardian.akka.AkkaStreamTestKit import io.aiven.guardian.akka.AnyPropTestKit -import io.aiven.guardian.kafka.Generators.KafkaDataWithTimePeriod -import io.aiven.guardian.kafka.Generators.kafkaDataWithTimePeriodsGen -import io.aiven.guardian.kafka.TestUtils.waitForStartOfTimeUnit -import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice -import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst -import io.aiven.guardian.kafka.codecs.Circe._ -import io.aiven.guardian.kafka.models.ReducedConsumerRecord -import org.apache.kafka.common.record.TimestampType -import org.mdedetrich.akka.stream.support.CirceStreamSupport -import org.scalatest.Inspectors -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.must.Matchers -import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks -import org.typelevel.jawn.AsyncParser - -import scala.annotation.nowarn -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.language.postfixOps - -import java.time.temporal.ChronoUnit -import java.util.concurrent.ConcurrentLinkedDeque -import java.util.concurrent.ConcurrentLinkedQueue - -final case class Periods(periodsBefore: Long, periodsAfter: Long) +import io.aiven.guardian.kafka.backup.configs.Compression class BackupClientInterfaceSpec extends AnyPropTestKit(ActorSystem("BackupClientInterfaceSpec")) - with AkkaStreamTestKit - with Matchers - with ScalaFutures - with ScalaCheckPropertyChecks - with StrictLogging { - - implicit val ec: ExecutionContext = system.dispatcher - implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis) - - property("Ordered Kafka events should produce at least one BackupStreamPosition.Boundary") { - forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = - new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) - ) - - val calculatedFuture = mock.materializeBackupStreamPositions() - - Inspectors.forAtLeast(1, calculatedFuture.futureValue)( - _ must equal(mock.End: mock.RecordElement) - ) - } - } - - property( - "Every ReducedConsumerRecord after a BackupStreamPosition.Boundary should be in the next consecutive time period" - ) { - forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = - new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) - ) - - val result = mock.materializeBackupStreamPositions().futureValue.toList - - val allBoundariesWithoutMiddles = result - .sliding(2) - .collect { case Seq(mock.End, afterRecordRecordElement: mock.Element) => - afterRecordRecordElement - } - .toList - - if (allBoundariesWithoutMiddles.length > 1) { - @nowarn("msg=not.*?exhaustive") - val withBeforeAndAfter = - allBoundariesWithoutMiddles.sliding(2).map { case Seq(before, after) => (before, after) }.toList - - val initialTime = kafkaDataWithTimePeriod.data.head.timestamp - - Inspectors.forEvery(withBeforeAndAfter) { case (before, after) => - val periodAsMillis = kafkaDataWithTimePeriod.periodSlice.toMillis - ((before.reducedConsumerRecord.timestamp - initialTime) / periodAsMillis) mustNot equal( - (after.reducedConsumerRecord.timestamp - initialTime) / periodAsMillis - ) - } - } - } - } - - property( - "The time difference between two consecutive BackupStreamPosition.Middle's has to be less then the specified time period" - ) { - forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = - new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) - ) - - val result = mock.materializeBackupStreamPositions().futureValue.toList - - val allCoupledMiddles = result - .sliding(2) - .collect { case Seq(before: mock.Element, after: mock.Element) => - (before, after) - } - .toList - - Inspectors.forEvery(allCoupledMiddles) { case (before, after) => - ChronoUnit.MICROS.between(before.reducedConsumerRecord.toOffsetDateTime, - after.reducedConsumerRecord.toOffsetDateTime - ) must be < kafkaDataWithTimePeriod.periodSlice.toMicros - } - } - } - - property("the time difference between the first and last timestamp for a given key is less than time period") { - forAll(kafkaDataWithTimePeriodsGen().filter(_.data.size > 2)) { - (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = - new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) - ) - - mock.clear() - val calculatedFuture = for { - _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) - processedRecords = mock.mergeBackedUpData() - asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => - Source - .single(byteString) - .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) - .collect { case Some(value) => - value - } - .toMat(Sink.collection)(Keep.right) - .run() - .map(records => (key, records)) - }) - } yield asRecords - - val result = calculatedFuture.futureValue - - Inspectors.forEvery(result) { case (_, records) => - (records.headOption, records.lastOption) match { - case (Some(first), Some(last)) if first != last => - ChronoUnit.MICROS.between(first.toOffsetDateTime, - last.toOffsetDateTime - ) must be < kafkaDataWithTimePeriod.periodSlice.toMicros - case _ => - } - } - } - } - - property("backup method completes flow correctly for all valid Kafka events") { - forAll(kafkaDataWithTimePeriodsGen().filter(_.data.size > 2)) { - (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = - new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) - ) - - mock.clear() - val calculatedFuture = for { - _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) - processedRecords = mock.mergeBackedUpData() - asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => - Source - .single(byteString) - .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) - .collect { case Some(value) => - value - } - .toMat(Sink.collection)(Keep.right) - .run() - .map(records => (key, records)) - }) - } yield asRecords - - val result = calculatedFuture.futureValue - - val observed = result.flatMap { case (_, values) => values } - - kafkaDataWithTimePeriod.data mustEqual observed - } - } - - property("backup method completes flow correctly for single element") { - val reducedConsumerRecord = ReducedConsumerRecord("", 0, 1, Some("key"), "value", 1, TimestampType.CREATE_TIME) - - val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source.single( - reducedConsumerRecord - ), - PeriodFromFirst(1 day) - ) - mock.clear() - val calculatedFuture = for { - _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) - processedRecords = mock.mergeBackedUpData() - asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => - Source - .single(byteString) - .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) - .collect { case Some(value) => - value - } - .toMat(Sink.collection)(Keep.right) - .run() - .map(records => (key, records)) - }) - } yield asRecords - - val result = calculatedFuture.futureValue - - val observed = result.flatMap { case (_, values) => values } - - List(reducedConsumerRecord) mustEqual observed - } - - property("backup method completes flow correctly for two elements") { - val reducedConsumerRecords = List( - ReducedConsumerRecord("", 0, 1, Some("key"), "value1", 1, TimestampType.CREATE_TIME), - ReducedConsumerRecord("", 0, 2, Some("key"), "value2", 2, TimestampType.CREATE_TIME) - ) - - val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source( - reducedConsumerRecords - ), - PeriodFromFirst(1 millis) - ) - mock.clear() - val calculatedFuture = for { - _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) - processedRecords = mock.mergeBackedUpData() - asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => - Source - .single(byteString) - .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) - .collect { case Some(value) => - value - } - .toMat(Sink.collection)(Keep.right) - .run() - .map(records => (key, records)) - }) - } yield asRecords - - val result = calculatedFuture.futureValue - - val observed = result.flatMap { case (_, values) => values } - - reducedConsumerRecords mustEqual observed - } - - property("backup method correctly terminates every key apart from last") { - forAll(kafkaDataWithTimePeriodsGen().filter(_.data.size > 1)) { - (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = - new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) - ) - - mock.clear() - val calculatedFuture = for { - _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) - processedRecords = mock.mergeBackedUpData(terminate = false) - } yield processedRecords.splitAt(processedRecords.length - 1) - - val (terminated, nonTerminated) = calculatedFuture.futureValue - - if (nonTerminated.nonEmpty) { - Inspectors.forEvery(terminated) { case (_, byteString) => - byteString.utf8String.takeRight(2) mustEqual "}]" - } - } - - Inspectors.forEvery(nonTerminated) { case (_, byteString) => - byteString.utf8String.takeRight(2) mustEqual "}," - } - } - } - - property("suspend/resume for same object using ChronoUnitSlice works correctly") { - // Since this test needs to wait for the start of the next minute we only want it to - // succeed once otherwise it runs for a very long time. - implicit val generatorDrivenConfig: PropertyCheckConfiguration = - PropertyCheckConfiguration(minSuccessful = 1) - - forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val commitStorage = new ConcurrentLinkedDeque[Long]() - val backupStorage = new ConcurrentLinkedQueue[(String, ByteString)]() - val data = kafkaDataWithTimePeriod.data - - val mockOne = new MockedBackupClientInterfaceWithMockedKafkaData( - Source(data), - ChronoUnitSlice(ChronoUnit.MINUTES), - commitStorage, - backupStorage, - stopAfterDuration = Some(kafkaDataWithTimePeriod.periodSlice), - handleOffsets = true - ) - - val mockTwo = new MockedBackupClientInterfaceWithMockedKafkaData(Source(data), - ChronoUnitSlice(ChronoUnit.MINUTES), - commitStorage, - backupStorage, - stopAfterDuration = None, - handleOffsets = true - ) - - val calculatedFuture = for { - _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) - _ <- mockOne.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(mockTwo.backup.run()) - processedRecords <- - akka.pattern.after(AkkaStreamInitializationConstant)( - Future.successful( - mockTwo.mergeBackedUpData() - ) - ) - asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => - Source - .single(byteString) - .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) - .collect { case Some(value) => - value - } - .toMat(Sink.collection)(Keep.right) - .run() - .map(records => (key, records)) - }) - } yield asRecords - - val result = calculatedFuture.futureValue - - val observed = result.flatMap { case (_, values) => values } - - data mustMatchTo observed - } - } + with BackupClientInterfaceTest { + override val compression: Option[Compression] = None } diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceTest.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceTest.scala new file mode 100644 index 00000000..9e3ebd53 --- /dev/null +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceTest.scala @@ -0,0 +1,365 @@ +package io.aiven.guardian.kafka.backup + +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString +import com.softwaremill.diffx.generic.auto._ +import com.softwaremill.diffx.scalatest.DiffMustMatcher._ +import com.typesafe.scalalogging.StrictLogging +import io.aiven.guardian.akka.AkkaStreamTestKit +import io.aiven.guardian.kafka.Generators.KafkaDataWithTimePeriod +import io.aiven.guardian.kafka.Generators.kafkaDataWithTimePeriodsGen +import io.aiven.guardian.kafka.TestUtils.waitForStartOfTimeUnit +import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice +import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst +import io.aiven.guardian.kafka.backup.configs.{Compression => CompressionModel} +import io.aiven.guardian.kafka.codecs.Circe._ +import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.kafka.common.record.TimestampType +import org.mdedetrich.akka.stream.support.CirceStreamSupport +import org.scalatest.Inspectors +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.must.Matchers +import org.scalatest.propspec.AnyPropSpecLike +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks +import org.typelevel.jawn.AsyncParser + +import scala.annotation.nowarn +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps + +import java.time.temporal.ChronoUnit +import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.ConcurrentLinkedQueue + +final case class Periods(periodsBefore: Long, periodsAfter: Long) + +trait BackupClientInterfaceTest + extends AnyPropSpecLike + with AkkaStreamTestKit + with Matchers + with ScalaFutures + with ScalaCheckPropertyChecks + with StrictLogging { + + implicit val ec: ExecutionContext = system.dispatcher + implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis) + + def compression: Option[CompressionModel] + + property("Ordered Kafka events should produce at least one BackupStreamPosition.Boundary") { + forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) + ) + + val calculatedFuture = mock.materializeBackupStreamPositions() + + Inspectors.forAtLeast(1, calculatedFuture.futureValue)( + _ must equal(mock.End: mock.RecordElement) + ) + } + } + + property( + "Every ReducedConsumerRecord after a BackupStreamPosition.Boundary should be in the next consecutive time period" + ) { + forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) + ) + + val result = mock.materializeBackupStreamPositions().futureValue.toList + + val allBoundariesWithoutMiddles = result + .sliding(2) + .collect { case Seq(mock.End, afterRecordRecordElement: mock.Element) => + afterRecordRecordElement + } + .toList + + if (allBoundariesWithoutMiddles.length > 1) { + @nowarn("msg=not.*?exhaustive") + val withBeforeAndAfter = + allBoundariesWithoutMiddles.sliding(2).map { case Seq(before, after) => (before, after) }.toList + + val initialTime = kafkaDataWithTimePeriod.data.head.timestamp + + Inspectors.forEvery(withBeforeAndAfter) { case (before, after) => + val periodAsMillis = kafkaDataWithTimePeriod.periodSlice.toMillis + ((before.reducedConsumerRecord.timestamp - initialTime) / periodAsMillis) mustNot equal( + (after.reducedConsumerRecord.timestamp - initialTime) / periodAsMillis + ) + } + } + } + } + + property( + "The time difference between two consecutive BackupStreamPosition.Middle's has to be less then the specified time period" + ) { + forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) + ) + + val result = mock.materializeBackupStreamPositions().futureValue.toList + + val allCoupledMiddles = result + .sliding(2) + .collect { case Seq(before: mock.Element, after: mock.Element) => + (before, after) + } + .toList + + Inspectors.forEvery(allCoupledMiddles) { case (before, after) => + ChronoUnit.MICROS.between(before.reducedConsumerRecord.toOffsetDateTime, + after.reducedConsumerRecord.toOffsetDateTime + ) must be < kafkaDataWithTimePeriod.periodSlice.toMicros + } + } + } + + property("the time difference between the first and last timestamp for a given key is less than time period") { + forAll(kafkaDataWithTimePeriodsGen().filter(_.data.size > 2)) { + (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) + ) + + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData() + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) + .collect { case Some(value) => + value + } + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => (key, records)) + }) + } yield asRecords + + val result = calculatedFuture.futureValue + + Inspectors.forEvery(result) { case (_, records) => + (records.headOption, records.lastOption) match { + case (Some(first), Some(last)) if first != last => + ChronoUnit.MICROS.between(first.toOffsetDateTime, + last.toOffsetDateTime + ) must be < kafkaDataWithTimePeriod.periodSlice.toMicros + case _ => + } + } + } + } + + property("backup method completes flow correctly for all valid Kafka events") { + forAll(kafkaDataWithTimePeriodsGen().filter(_.data.size > 2)) { + (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice), + compression + ) + + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData(compression = compression.map(_.`type`)) + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) + .collect { case Some(value) => + value + } + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => (key, records)) + }) + } yield asRecords + + val result = calculatedFuture.futureValue + + val observed = result.flatMap { case (_, values) => values } + + kafkaDataWithTimePeriod.data mustEqual observed + } + } + + property("backup method completes flow correctly for single element") { + val reducedConsumerRecord = ReducedConsumerRecord("", 0, 1, Some("key"), "value", 1, TimestampType.CREATE_TIME) + + val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source.single( + reducedConsumerRecord + ), + PeriodFromFirst(1 day), + compression + ) + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData(compression = compression.map(_.`type`)) + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) + .collect { case Some(value) => + value + } + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => (key, records)) + }) + } yield asRecords + + val result = calculatedFuture.futureValue + + val observed = result.flatMap { case (_, values) => values } + + List(reducedConsumerRecord) mustEqual observed + } + + property("backup method completes flow correctly for two elements") { + val reducedConsumerRecords = List( + ReducedConsumerRecord("", 0, 1, Some("key"), "value1", 1, TimestampType.CREATE_TIME), + ReducedConsumerRecord("", 0, 2, Some("key"), "value2", 2, TimestampType.CREATE_TIME) + ) + + val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source( + reducedConsumerRecords + ), + PeriodFromFirst(1 millis), + compression + ) + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData(compression = compression.map(_.`type`)) + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) + .collect { case Some(value) => + value + } + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => (key, records)) + }) + } yield asRecords + + val result = calculatedFuture.futureValue + + val observed = result.flatMap { case (_, values) => values } + + reducedConsumerRecords mustEqual observed + } + + property("backup method correctly terminates every key apart from last") { + forAll(kafkaDataWithTimePeriodsGen().filter(_.data.size > 1)) { + (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice), + compression + ) + + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData(terminate = false, compression = compression.map(_.`type`)) + } yield processedRecords.splitAt(processedRecords.length - 1) + + val (terminated, nonTerminated) = calculatedFuture.futureValue + + if (nonTerminated.nonEmpty) { + Inspectors.forEvery(terminated) { case (_, byteString) => + byteString.utf8String.takeRight(2) mustEqual "}]" + } + } + + Inspectors.forEvery(nonTerminated) { case (_, byteString) => + byteString.utf8String.takeRight(2) mustEqual "}," + } + } + } + + property("suspend/resume for same object using ChronoUnitSlice works correctly") { + // Since this test needs to wait for the start of the next minute we only want it to + // succeed once otherwise it runs for a very long time. + implicit val generatorDrivenConfig: PropertyCheckConfiguration = + PropertyCheckConfiguration(minSuccessful = 1) + + forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val commitStorage = new ConcurrentLinkedDeque[Long]() + val backupStorage = new ConcurrentLinkedQueue[(String, ByteString)]() + val data = kafkaDataWithTimePeriod.data + + val mockOne = new MockedBackupClientInterfaceWithMockedKafkaData( + Source(data), + ChronoUnitSlice(ChronoUnit.MINUTES), + None, + commitStorage, + backupStorage, + stopAfterDuration = Some(kafkaDataWithTimePeriod.periodSlice), + handleOffsets = true + ) + + val mockTwo = new MockedBackupClientInterfaceWithMockedKafkaData(Source(data), + ChronoUnitSlice(ChronoUnit.MINUTES), + None, + commitStorage, + backupStorage, + stopAfterDuration = None, + handleOffsets = true + ) + + val calculatedFuture = for { + _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) + _ <- mockOne.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(mockTwo.backup.run()) + processedRecords <- + akka.pattern.after(AkkaStreamInitializationConstant)( + Future.successful( + mockTwo.mergeBackedUpData() + ) + ) + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) + .collect { case Some(value) => + value + } + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => (key, records)) + }) + } yield asRecords + + val result = calculatedFuture.futureValue + + val observed = result.flatMap { case (_, values) => values } + + data mustMatchTo observed + } + } +} diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/CompressionSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/CompressionSpec.scala new file mode 100644 index 00000000..6fb455ab --- /dev/null +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/CompressionSpec.scala @@ -0,0 +1,46 @@ +package io.aiven.guardian.kafka.backup + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Compression +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.SourceWithContext +import akka.util.ByteString +import io.aiven.guardian.akka.AnyPropTestKit +import io.aiven.guardian.kafka.backup.configs.{Compression => CompressionModel} +import io.aiven.guardian.kafka.models.Gzip +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.must.Matchers +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks + +import scala.concurrent.ExecutionContext + +class CompressionSpec + extends AnyPropTestKit(ActorSystem("CompressionSpec")) + with Matchers + with ScalaFutures + with ScalaCheckPropertyChecks { + + implicit val ec: ExecutionContext = system.dispatcher + + property("Gzip compression works with a SourceWithContext/FlowWithContext") { + forAll { data: List[String] => + val asByteString = data.map(ByteString.fromString) + val zippedWithIndex = asByteString.zipWithIndex + val sourceWithContext = SourceWithContext.fromTuples( + Source(zippedWithIndex) + ) + + val calculatedFuture = for { + compressed <- sourceWithContext + .unsafeDataVia(BackupClientInterface.compressionFlow(CompressionModel(Gzip, None))) + .asSource + .map { case (byteString, _) => byteString } + .runFold(ByteString.empty)(_ ++ _) + decompressed <- Source.single(compressed).via(Compression.gunzip()).runFold(ByteString.empty)(_ ++ _) + } yield decompressed + + val decompressed = calculatedFuture.futureValue + data.mkString mustEqual decompressed.utf8String + } + } +} diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/ConfigurationChangeRestartSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/ConfigurationChangeRestartSpec.scala new file mode 100644 index 00000000..d618555e --- /dev/null +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/ConfigurationChangeRestartSpec.scala @@ -0,0 +1,181 @@ +package io.aiven.guardian.kafka.backup + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString +import com.softwaremill.diffx.generic.auto._ +import com.softwaremill.diffx.scalatest.DiffMustMatcher._ +import com.typesafe.scalalogging.StrictLogging +import io.aiven.guardian.akka.AkkaStreamTestKit +import io.aiven.guardian.akka.AnyPropTestKit +import io.aiven.guardian.kafka.Generators.KafkaDataWithTimePeriod +import io.aiven.guardian.kafka.Generators.kafkaDataWithTimePeriodsGen +import io.aiven.guardian.kafka.TestUtils.waitForStartOfTimeUnit +import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice +import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.kafka.codecs.Circe._ +import io.aiven.guardian.kafka.models.BackupObjectMetadata +import io.aiven.guardian.kafka.models.Gzip +import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.mdedetrich.akka.stream.support.CirceStreamSupport +import org.scalatest.Inspectors +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.must.Matchers +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks +import org.typelevel.jawn.AsyncParser + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.language.postfixOps + +import java.time.temporal.ChronoUnit +import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.ConcurrentLinkedQueue + +class ConfigurationChangeRestartSpec + extends AnyPropTestKit(ActorSystem("ConfigurationChangeSpec")) + with AkkaStreamTestKit + with Matchers + with ScalaFutures + with ScalaCheckPropertyChecks + with StrictLogging { + + implicit val ec: ExecutionContext = system.dispatcher + implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis) + + implicit override val generatorDrivenConfig: PropertyCheckConfiguration = + PropertyCheckConfiguration(minSuccessful = 1) + + property("GZip compression enabled initially and then BackupClient restarted with compression disabled") { + implicit val generatorDrivenConfig: PropertyCheckConfiguration = + PropertyCheckConfiguration(minSuccessful = 1) + + forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val commitStorage = new ConcurrentLinkedDeque[Long]() + val backupStorage = new ConcurrentLinkedQueue[(String, ByteString)]() + val data = kafkaDataWithTimePeriod.data + + val mockOne = new MockedBackupClientInterfaceWithMockedKafkaData( + Source(data), + ChronoUnitSlice(ChronoUnit.MINUTES), + Some(Compression(Gzip, None)), + commitStorage, + backupStorage, + stopAfterDuration = Some(kafkaDataWithTimePeriod.periodSlice), + handleOffsets = true + ) + + val mockTwo = new MockedBackupClientInterfaceWithMockedKafkaData(Source(data), + ChronoUnitSlice(ChronoUnit.MINUTES), + None, + commitStorage, + backupStorage, + stopAfterDuration = None, + handleOffsets = true + ) + + val calculatedFuture = for { + _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) + _ <- mockOne.backup.run() + keysWithGzip <- akka.pattern.after(AkkaStreamInitializationConstant)( + Future.successful( + backupStorage.asScala.map { case (key, _) => key }.toSet + ) + ) + _ <- mockTwo.backup.run() + keysWithoutGzip <- + akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful { + val allKeys = backupStorage.asScala.map { case (key, _) => key }.toSet + allKeys diff keysWithGzip + }) + processedRecords = mockTwo.mergeBackedUpData(compression = Some(Gzip)) + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) + .collect { case Some(value) => + value + } + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => (key, records)) + }) + } yield (asRecords, keysWithGzip, keysWithoutGzip) + + val (records, keysWithGzip, keysWithoutGzip) = calculatedFuture.futureValue + + val observed = records.flatMap { case (_, values) => values } + + Inspectors.forEvery(keysWithGzip)(key => BackupObjectMetadata.fromKey(key).compression must contain(Gzip)) + Inspectors.forEvery(keysWithoutGzip)(key => BackupObjectMetadata.fromKey(key).compression mustBe empty) + data mustMatchTo observed + } + } + + property("no compression enabled initially and then BackupClient restarted with GZip compression enabled") { + forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val commitStorage = new ConcurrentLinkedDeque[Long]() + val backupStorage = new ConcurrentLinkedQueue[(String, ByteString)]() + val data = kafkaDataWithTimePeriod.data + + val mockOne = new MockedBackupClientInterfaceWithMockedKafkaData( + Source(data), + ChronoUnitSlice(ChronoUnit.MINUTES), + None, + commitStorage, + backupStorage, + stopAfterDuration = Some(kafkaDataWithTimePeriod.periodSlice), + handleOffsets = true + ) + + val mockTwo = new MockedBackupClientInterfaceWithMockedKafkaData(Source(data), + ChronoUnitSlice(ChronoUnit.MINUTES), + Some(Compression(Gzip, None)), + commitStorage, + backupStorage, + stopAfterDuration = None, + handleOffsets = true + ) + + val calculatedFuture = for { + _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) + _ <- mockOne.backup.run() + keysWithoutGzip <- akka.pattern.after(AkkaStreamInitializationConstant)( + Future.successful( + backupStorage.asScala.map { case (key, _) => key }.toSet + ) + ) + _ <- mockTwo.backup.run() + keysWithGzip <- + akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful { + val allKeys = backupStorage.asScala.map { case (key, _) => key }.toSet + allKeys diff keysWithoutGzip + }) + processedRecords = mockTwo.mergeBackedUpData(compression = Some(Gzip)) + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) + .collect { case Some(value) => + value + } + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => (key, records)) + }) + } yield (asRecords, keysWithGzip, keysWithoutGzip) + + val (records, keysWithGzip, keysWithoutGzip) = calculatedFuture.futureValue + + val observed = records.flatMap { case (_, values) => values } + + Inspectors.forEvery(keysWithGzip)(key => BackupObjectMetadata.fromKey(key).compression must contain(Gzip)) + Inspectors.forEvery(keysWithoutGzip)(key => BackupObjectMetadata.fromKey(key).compression mustBe empty) + data mustMatchTo observed + } + } +} diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/GzipCompressionBackupClientInterfaceSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/GzipCompressionBackupClientInterfaceSpec.scala new file mode 100644 index 00000000..9df2854c --- /dev/null +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/GzipCompressionBackupClientInterfaceSpec.scala @@ -0,0 +1,12 @@ +package io.aiven.guardian.kafka.backup + +import akka.actor.ActorSystem +import io.aiven.guardian.akka.AnyPropTestKit +import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.kafka.models.Gzip + +class GzipCompressionBackupClientInterfaceSpec + extends AnyPropTestKit(ActorSystem("GzipCompressionBackupClientInterfaceSpec")) + with BackupClientInterfaceTest { + override val compression: Option[Compression] = Some(Compression(Gzip, None)) +} diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala index 836454bd..45755fce 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala @@ -3,6 +3,7 @@ package io.aiven.guardian.kafka.backup import akka.Done import akka.NotUsed import akka.actor.ActorSystem +import akka.stream.scaladsl.Compression import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink @@ -12,9 +13,14 @@ import io.aiven.guardian.kafka.TestUtils._ import io.aiven.guardian.kafka.Utils import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.backup.configs.TimeConfiguration +import io.aiven.guardian.kafka.backup.configs.{Compression => CompressionModel} +import io.aiven.guardian.kafka.models.BackupObjectMetadata +import io.aiven.guardian.kafka.models.CompressionType +import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.models.ReducedConsumerRecord import scala.collection.immutable +import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ @@ -34,6 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue */ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafkaClientInterface, timeConfiguration: TimeConfiguration, + compression: Option[CompressionModel] = None, backedUpData: ConcurrentLinkedQueue[(String, ByteString)] = new ConcurrentLinkedQueue[(String, ByteString)]() )(implicit override val system: ActorSystem) @@ -50,21 +57,33 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka * @return * `backupData` with all of the `ByteString` data merged for each unique key */ - def mergeBackedUpData(terminate: Boolean = true, sort: Boolean = true): List[(String, ByteString)] = { + def mergeBackedUpData(terminate: Boolean = true, + sort: Boolean = true, + compression: Option[CompressionType] = None + ): List[(String, ByteString)] = { val base = backedUpData.asScala .orderedGroupBy { case (key, _) => key } .view - .mapValues { data => - val complete = data.map { case (_, byteString) => byteString }.foldLeft(ByteString())(_ ++ _) - if (terminate) - if (complete.utf8String.endsWith("},")) - complete ++ ByteString("null]") + .map { case (key, data) => + val joined = joinByteString(data.map { case (_, byteString) => byteString }) + val complete = + // Only bother decompressing when needed since some tests can have mixed backups + if (BackupObjectMetadata.fromKey(key).compression.isDefined) + decompress(joined, compression) + else + joined + + val finalData = + if (terminate) + if (complete.utf8String.endsWith("},")) + complete ++ ByteString("null]") + else + complete else complete - else - complete + (key, finalData) } .toList if (sort) @@ -80,7 +99,8 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka override implicit lazy val backupConfig: Backup = Backup( KafkaGroupId, timeConfiguration, - 10 seconds + 10 seconds, + compression ) /** Override this type to define the result of backing up data to a datasource @@ -97,8 +117,9 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka if (index < 0) UploadStateResult.empty else { - val previous = keys.lift(index - 1).map(k => PreviousState((), k)) - val current = Some(()) + val previous = + keys.lift(index - 1).map(k => PreviousState(StateDetails((), BackupObjectMetadata.fromKey(k)), k)) + val current = Some(StateDetails((), BackupObjectMetadata.fromKey(key))) UploadStateResult(current, previous) } @@ -138,6 +159,20 @@ object MockedBackupClientInterface { * Kafka cluster) where as when used against actual test Kafka clusters this is the consumer group that is used */ val KafkaGroupId: String = "test" + + def decompress(byteString: ByteString, compression: Option[CompressionType])(implicit + system: ActorSystem + ): ByteString = + compression match { + case Some(Gzip) => + Await.result(Source.single(byteString).via(Compression.gunzip()).runFold(ByteString.empty)(_ ++ _), + Duration.Inf + ) + case None => byteString + } + + def joinByteString(byteStrings: IterableOnce[ByteString]): ByteString = + byteStrings.iterator.foldLeft(ByteString.empty)(_ ++ _) } /** A `MockedBackupClientInterface` that also uses a mocked `KafkaClientInterface` @@ -145,6 +180,7 @@ object MockedBackupClientInterface { class MockedBackupClientInterfaceWithMockedKafkaData( kafkaData: Source[ReducedConsumerRecord, NotUsed], timeConfiguration: TimeConfiguration, + compression: Option[CompressionModel] = None, commitStorage: ConcurrentLinkedDeque[Long] = new ConcurrentLinkedDeque[Long](), backedUpData: ConcurrentLinkedQueue[(String, ByteString)] = new ConcurrentLinkedQueue[(String, ByteString)](), stopAfterDuration: Option[FiniteDuration] = None, @@ -153,5 +189,6 @@ class MockedBackupClientInterfaceWithMockedKafkaData( extends MockedBackupClientInterface( new MockedKafkaClientInterface(kafkaData, commitStorage, stopAfterDuration, handleOffsets), timeConfiguration, + compression, backedUpData ) diff --git a/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreClientInterface.scala b/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreClientInterface.scala index 7f4d6448..ee81814b 100644 --- a/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreClientInterface.scala +++ b/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreClientInterface.scala @@ -5,6 +5,7 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.Attributes import akka.stream.SharedKillSwitch +import akka.stream.scaladsl.Compression import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Source import akka.util.ByteString @@ -13,6 +14,8 @@ import io.aiven.guardian.kafka.ExtensionsMethods._ import io.aiven.guardian.kafka.Utils import io.aiven.guardian.kafka.codecs.Circe._ import io.aiven.guardian.kafka.configs.KafkaCluster +import io.aiven.guardian.kafka.models.BackupObjectMetadata +import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.restore.configs.Restore import markatta.futiles.Traversal.traverseSequentially @@ -84,9 +87,16 @@ trait RestoreClientInterface[T <: KafkaProducerInterface] extends LazyLogging { } private[kafka] def restoreKey(key: String): Future[Done] = { - val base = Source + val source = Source .single(key) .via(downloadFlow) + + val sourceWithCompression = BackupObjectMetadata.fromKey(key).compression match { + case Some(Gzip) => source.via(Compression.gunzip()) + case None => source + } + + val base = sourceWithCompression .via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray)) .collect { case Some(reducedConsumerRecord) diff --git a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/GzipCompressionRestoreClientInterfaceSpec.scala b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/GzipCompressionRestoreClientInterfaceSpec.scala new file mode 100644 index 00000000..20ea2fa0 --- /dev/null +++ b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/GzipCompressionRestoreClientInterfaceSpec.scala @@ -0,0 +1,12 @@ +package io.aiven.guardian.kafka.restore + +import akka.actor.ActorSystem +import io.aiven.guardian.akka.AnyPropTestKit +import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.kafka.models.Gzip + +class GzipCompressionRestoreClientInterfaceSpec + extends AnyPropTestKit(ActorSystem("GzipCompressionRestoreClientInterfaceSpec")) + with RestoreClientInterfaceTest { + override val compression: Option[Compression] = Some(Compression(Gzip, None)) +} diff --git a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala index 9c79e9c0..964dfd05 100644 --- a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala +++ b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala @@ -1,145 +1,11 @@ package io.aiven.guardian.kafka.restore import akka.actor.ActorSystem -import akka.stream.scaladsl.Source -import com.softwaremill.diffx.generic.auto._ -import com.softwaremill.diffx.scalatest.DiffMustMatcher._ -import com.typesafe.scalalogging.StrictLogging -import io.aiven.guardian.akka.AkkaStreamTestKit import io.aiven.guardian.akka.AnyPropTestKit -import io.aiven.guardian.kafka.ExtensionsMethods._ -import io.aiven.guardian.kafka.Generators._ -import io.aiven.guardian.kafka.Utils -import io.aiven.guardian.kafka.backup.MockedBackupClientInterfaceWithMockedKafkaData -import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst -import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig} -import io.aiven.guardian.kafka.restore.configs.{Restore => RestoreConfig} -import org.scalatest.Inspectors -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.must.Matchers -import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks - -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.jdk.CollectionConverters._ -import scala.jdk.DurationConverters._ -import scala.language.postfixOps - -import java.time.temporal.ChronoUnit +import io.aiven.guardian.kafka.backup.configs.Compression class RestoreClientInterfaceSpec extends AnyPropTestKit(ActorSystem("RestoreClientInterfaceSpec")) - with AkkaStreamTestKit - with Matchers - with ScalaFutures - with ScalaCheckPropertyChecks - with StrictLogging { - - implicit val ec: ExecutionContext = system.dispatcher - implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis) - - property("Calculating finalKeys contains correct keys with fromWhen filter") { - forAll( - kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = - Range.inclusive(1, ChronoUnit.HOURS.getDuration.toMillis.toInt), - min = 1000, - max = 1000 - ) - ) { (kafkaDataWithTimePeriodAndPickedRecord: KafkaDataWithTimePeriodAndPickedRecord) => - implicit val mockedKafkaProducerInterface: MockedKafkaProducerInterface = new MockedKafkaProducerInterface - implicit val kafkaClusterConfig: KafkaClusterConfig = - KafkaClusterConfig(kafkaDataWithTimePeriodAndPickedRecord.topics) - val pickedTime = kafkaDataWithTimePeriodAndPickedRecord.picked.toOffsetDateTime - implicit val restoreConfig: RestoreConfig = - RestoreConfig(Some(pickedTime), None) - - val backupMock = - new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriodAndPickedRecord.data), - PeriodFromFirst( - ChronoUnit.SECONDS.getDuration.toScala - ) - ) - - backupMock.clear() - val calculatedFuture = for { - _ <- backupMock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) - processedRecords = backupMock.mergeBackedUpData() - restoreMock = new MockedRestoreClientInterface(processedRecords.toMap) - keys <- restoreMock.finalKeys - } yield keys - - val result = calculatedFuture.futureValue - - Inspectors.forEvery(result.drop(1)) { key => - (Utils.keyToOffsetDateTime(key) >= pickedTime) must be(true) - } - - } - } - - property("Round-trip with backup and restore") { - forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - implicit val restoreConfig: RestoreConfig = RestoreConfig.empty - implicit val mockedKafkaProducerInterface: MockedKafkaProducerInterface = new MockedKafkaProducerInterface - implicit val kafkaClusterConfig: KafkaClusterConfig = KafkaClusterConfig(kafkaDataWithTimePeriod.topics) - - val backupMock = - new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) - ) - - backupMock.clear() - val calculatedFuture = for { - _ <- backupMock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) - processedRecords = backupMock.mergeBackedUpData() - restoreMock = new MockedRestoreClientInterface(processedRecords.toMap) - _ <- restoreMock.restore - restoredData = restoreMock.kafkaProducerInterface.producedData.asScala.toList - } yield restoredData - - calculatedFuture.futureValue mustMatchTo kafkaDataWithTimePeriod.data - } - } - - property("Round-trip with backup and restore works using fromWhen filter") { - forAll( - kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = - Range.inclusive(1, ChronoUnit.HOURS.getDuration.toMillis.toInt), - min = 1000, - max = 1000 - ) - ) { (kafkaDataWithTimePeriodAndPickedRecord: KafkaDataWithTimePeriodAndPickedRecord) => - implicit val mockedKafkaProducerInterface: MockedKafkaProducerInterface = new MockedKafkaProducerInterface - implicit val kafkaClusterConfig: KafkaClusterConfig = - KafkaClusterConfig(kafkaDataWithTimePeriodAndPickedRecord.topics) - implicit val restoreConfig: RestoreConfig = - RestoreConfig(Some(kafkaDataWithTimePeriodAndPickedRecord.picked.toOffsetDateTime), None) - - val backupMock = - new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriodAndPickedRecord.data), - PeriodFromFirst( - kafkaDataWithTimePeriodAndPickedRecord.periodSlice - ) - ) - - backupMock.clear() - val calculatedFuture = for { - _ <- backupMock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) - processedRecords = backupMock.mergeBackedUpData() - restoreMock = new MockedRestoreClientInterface(processedRecords.toMap) - _ <- restoreMock.restore - restoredData = restoreMock.kafkaProducerInterface.producedData.asScala.toList - } yield restoredData - - calculatedFuture.futureValue mustMatchTo kafkaDataWithTimePeriodAndPickedRecord.data.filter { - reducedConsumerRecord => - reducedConsumerRecord.toOffsetDateTime >= kafkaDataWithTimePeriodAndPickedRecord.picked.toOffsetDateTime - } - } - - } + with RestoreClientInterfaceTest { + override val compression: Option[Compression] = None } diff --git a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceTest.scala b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceTest.scala new file mode 100644 index 00000000..6ef836ca --- /dev/null +++ b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceTest.scala @@ -0,0 +1,150 @@ +package io.aiven.guardian.kafka.restore + +import akka.stream.scaladsl.Source +import com.softwaremill.diffx.generic.auto._ +import com.softwaremill.diffx.scalatest.DiffMustMatcher._ +import com.typesafe.scalalogging.StrictLogging +import io.aiven.guardian.akka.AkkaStreamTestKit +import io.aiven.guardian.kafka.ExtensionsMethods._ +import io.aiven.guardian.kafka.Generators._ +import io.aiven.guardian.kafka.Utils +import io.aiven.guardian.kafka.backup.MockedBackupClientInterfaceWithMockedKafkaData +import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst +import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig} +import io.aiven.guardian.kafka.restore.configs.{Restore => RestoreConfig} +import org.scalatest.Inspectors +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.must.Matchers +import org.scalatest.propspec.AnyPropSpecLike +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ +import scala.language.postfixOps + +import java.time.temporal.ChronoUnit + +trait RestoreClientInterfaceTest + extends AnyPropSpecLike + with AkkaStreamTestKit + with Matchers + with ScalaFutures + with ScalaCheckPropertyChecks + with StrictLogging { + + implicit val ec: ExecutionContext = system.dispatcher + implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis) + + def compression: Option[Compression] + + property("Calculating finalKeys contains correct keys with fromWhen filter") { + forAll( + kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = + Range.inclusive(1, ChronoUnit.HOURS.getDuration.toMillis.toInt), + min = 1000, + max = 1000 + ) + ) { (kafkaDataWithTimePeriodAndPickedRecord: KafkaDataWithTimePeriodAndPickedRecord) => + implicit val mockedKafkaProducerInterface: MockedKafkaProducerInterface = new MockedKafkaProducerInterface + implicit val kafkaClusterConfig: KafkaClusterConfig = + KafkaClusterConfig(kafkaDataWithTimePeriodAndPickedRecord.topics) + val pickedTime = kafkaDataWithTimePeriodAndPickedRecord.picked.toOffsetDateTime + implicit val restoreConfig: RestoreConfig = + RestoreConfig(Some(pickedTime), None) + + val backupMock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriodAndPickedRecord.data), + PeriodFromFirst( + ChronoUnit.SECONDS.getDuration.toScala + ), + compression + ) + + backupMock.clear() + val calculatedFuture = for { + _ <- backupMock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = backupMock.mergeBackedUpData(compression = compression.map(_.`type`)) + restoreMock = new MockedRestoreClientInterface(processedRecords.toMap) + keys <- restoreMock.finalKeys + } yield keys + + val result = calculatedFuture.futureValue + + Inspectors.forEvery(result.drop(1)) { key => + (Utils.keyToOffsetDateTime(key) >= pickedTime) must be(true) + } + + } + } + + property("Round-trip with backup and restore") { + forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + implicit val restoreConfig: RestoreConfig = RestoreConfig.empty + implicit val mockedKafkaProducerInterface: MockedKafkaProducerInterface = new MockedKafkaProducerInterface + implicit val kafkaClusterConfig: KafkaClusterConfig = KafkaClusterConfig(kafkaDataWithTimePeriod.topics) + + val backupMock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice), + compression + ) + + backupMock.clear() + val calculatedFuture = for { + _ <- backupMock.backup.run() + _ <- akka.pattern.after(10 seconds)(Future.successful(())) + processedRecords = backupMock.mergeBackedUpData() + restoreMock = new MockedRestoreClientInterface(processedRecords.toMap) + _ <- restoreMock.restore + restoredData = restoreMock.kafkaProducerInterface.producedData.asScala.toList + } yield restoredData + + calculatedFuture.futureValue mustMatchTo kafkaDataWithTimePeriod.data + } + } + + property("Round-trip with backup and restore works using fromWhen filter") { + forAll( + kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = + Range.inclusive(1, ChronoUnit.HOURS.getDuration.toMillis.toInt), + min = 1000, + max = 1000 + ) + ) { (kafkaDataWithTimePeriodAndPickedRecord: KafkaDataWithTimePeriodAndPickedRecord) => + implicit val mockedKafkaProducerInterface: MockedKafkaProducerInterface = new MockedKafkaProducerInterface + implicit val kafkaClusterConfig: KafkaClusterConfig = + KafkaClusterConfig(kafkaDataWithTimePeriodAndPickedRecord.topics) + implicit val restoreConfig: RestoreConfig = + RestoreConfig(Some(kafkaDataWithTimePeriodAndPickedRecord.picked.toOffsetDateTime), None) + + val backupMock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriodAndPickedRecord.data), + PeriodFromFirst( + kafkaDataWithTimePeriodAndPickedRecord.periodSlice + ), + compression + ) + + backupMock.clear() + val calculatedFuture = for { + _ <- backupMock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = backupMock.mergeBackedUpData() + restoreMock = new MockedRestoreClientInterface(processedRecords.toMap) + _ <- restoreMock.restore + restoredData = restoreMock.kafkaProducerInterface.producedData.asScala.toList + } yield restoredData + + calculatedFuture.futureValue mustMatchTo kafkaDataWithTimePeriodAndPickedRecord.data.filter { + reducedConsumerRecord => + reducedConsumerRecord.toOffsetDateTime >= kafkaDataWithTimePeriodAndPickedRecord.picked.toOffsetDateTime + } + } + + } +} diff --git a/core/src/main/scala/io/aiven/guardian/kafka/Utils.scala b/core/src/main/scala/io/aiven/guardian/kafka/Utils.scala index 4f7b554e..f1091831 100644 --- a/core/src/main/scala/io/aiven/guardian/kafka/Utils.scala +++ b/core/src/main/scala/io/aiven/guardian/kafka/Utils.scala @@ -1,10 +1,26 @@ package io.aiven.guardian.kafka +import scala.annotation.tailrec + import java.time.OffsetDateTime +import java.time.format.DateTimeParseException object Utils { + + private def parseToOffsetDateTime(string: String): Option[OffsetDateTime] = + try + Some(OffsetDateTime.parse(string)) + catch { + case _: DateTimeParseException => + None + } + + @tailrec def keyToOffsetDateTime(key: String): OffsetDateTime = { val withoutExtension = key.substring(0, key.lastIndexOf('.')) - OffsetDateTime.parse(withoutExtension) + parseToOffsetDateTime(withoutExtension) match { + case Some(offsetDateTime) => offsetDateTime + case None => keyToOffsetDateTime(withoutExtension) + } } } diff --git a/core/src/main/scala/io/aiven/guardian/kafka/models/BackupObjectMetadata.scala b/core/src/main/scala/io/aiven/guardian/kafka/models/BackupObjectMetadata.scala new file mode 100644 index 00000000..769b2eba --- /dev/null +++ b/core/src/main/scala/io/aiven/guardian/kafka/models/BackupObjectMetadata.scala @@ -0,0 +1,11 @@ +package io.aiven.guardian.kafka.models + +final case class BackupObjectMetadata(compression: Option[CompressionType]) + +object BackupObjectMetadata { + def fromKey(key: String): BackupObjectMetadata = + if (key.endsWith(".gz")) + BackupObjectMetadata(Some(Gzip)) + else + BackupObjectMetadata(None) +} diff --git a/core/src/main/scala/io/aiven/guardian/kafka/models/CompressionType.scala b/core/src/main/scala/io/aiven/guardian/kafka/models/CompressionType.scala new file mode 100644 index 00000000..cb84f9db --- /dev/null +++ b/core/src/main/scala/io/aiven/guardian/kafka/models/CompressionType.scala @@ -0,0 +1,9 @@ +package io.aiven.guardian.kafka.models + +sealed trait CompressionType { + val pretty: String +} + +case object Gzip extends CompressionType { + override val pretty: String = "Gzip" +} diff --git a/docs/src/main/paradox/backup/configuration.md b/docs/src/main/paradox/backup/configuration.md index f88a2cbc..3588fe98 100644 --- a/docs/src/main/paradox/backup/configuration.md +++ b/docs/src/main/paradox/backup/configuration.md @@ -29,3 +29,9 @@ Scala API doc @apidoc[kafka.backup.configs.Backup] * `commit-timeout-buffer-window`: Guardian sets the commit timeout of the Kafka consumer based on the `time-configuration` since Guardian does manual committing of cursors. The buffer gets added onto the `time-configuration` to give some headroom for any theoretical delays. + * `compression`: The compression format to use for the data being backed up. Note that changes in compression + configuration will not apply for any currently existing backups that need to be completed, only for future + new backups. + * `type`: Which compression to use. + * `gzip`. Standard [Gzip](https://en.wikipedia.org/wiki/Gzip) compression + * `level`: The level of compression to use diff --git a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3GzipCompressionRestoreClientSpec.scala b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3GzipCompressionRestoreClientSpec.scala new file mode 100644 index 00000000..dce89412 --- /dev/null +++ b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3GzipCompressionRestoreClientSpec.scala @@ -0,0 +1,12 @@ +package io.aiven.guardian.kafka.restore.s3 + +import akka.actor.ActorSystem +import io.aiven.guardian.akka.AnyPropTestKit +import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.kafka.models.Gzip + +class RealS3GzipCompressionRestoreClientSpec + extends AnyPropTestKit(ActorSystem("RealS3GzipCompressionRestoreClientSpec")) + with RealS3RestoreClientTest { + override val compression: Option[Compression] = Some(Compression(Gzip, None)) +} diff --git a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala index defab524..8a7c7f46 100644 --- a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala +++ b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala @@ -1,162 +1,11 @@ package io.aiven.guardian.kafka.restore.s3 import akka.actor.ActorSystem -import akka.kafka.ConsumerSettings -import akka.kafka.Subscriptions -import akka.kafka.scaladsl.Consumer -import akka.kafka.scaladsl.Consumer.DrainingControl -import akka.kafka.scaladsl.Producer -import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Sink -import com.softwaremill.diffx.scalatest.DiffMustMatcher._ import io.aiven.guardian.akka.AnyPropTestKit -import io.aiven.guardian.kafka.Generators._ -import io.aiven.guardian.kafka.KafkaClusterTest -import io.aiven.guardian.kafka.backup.BackupClientControlWrapper -import io.aiven.guardian.kafka.backup.KafkaClient -import io.aiven.guardian.kafka.backup.configs.Backup -import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst -import io.aiven.guardian.kafka.backup.s3.BackupClient -import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig} -import io.aiven.guardian.kafka.restore.KafkaProducer -import io.aiven.guardian.kafka.restore.configs.{Restore => RestoreConfig} -import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen -import io.aiven.guardian.kafka.s3.S3Spec -import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.scalatest.matchers.must.Matchers -import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks - -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration._ -import scala.language.postfixOps +import io.aiven.guardian.kafka.backup.configs.Compression class RealS3RestoreClientSpec extends AnyPropTestKit(ActorSystem("RealS3RestoreClientSpec")) - with S3Spec - with Matchers - with KafkaClusterTest - with ScalaCheckPropertyChecks { - - override lazy val s3Settings: S3Settings = S3Settings() - - def waitForSingleDownload(dataBucket: String): Future[Unit] = waitForS3Download( - dataBucket, - { - case Seq(_) => () - case rest => - throw DownloadNotReady(rest) - } - ) - - /** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail - * on bucket creation - */ - override lazy val useVirtualDotHost: Boolean = false - override lazy val bucketPrefix: Option[String] = Some("guardian-") - override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds) - - property("Round-trip with backup and restore", RealS3Available) { - forAll( - kafkaDataWithMinSizeRenamedTopicsGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), - s3ConfigGen(useVirtualDotHost, bucketPrefix), - kafkaConsumerGroupGen - ) { - (kafkaDataInChunksWithTimePeriodRenamedTopics: KafkaDataInChunksWithTimePeriodRenamedTopics, - s3Config: S3Config, - kafkaConsumerGroup: String - ) => - implicit val restoreConfig: RestoreConfig = - RestoreConfig(None, Some(kafkaDataInChunksWithTimePeriodRenamedTopics.renamedTopics)) - implicit val kafkaProducer: KafkaProducer = new KafkaProducer(configureProducer = baseProducerConfig) - implicit val kafkaClusterConfig: KafkaClusterConfig = - KafkaClusterConfig(kafkaDataInChunksWithTimePeriodRenamedTopics.topics) - - val data = kafkaDataInChunksWithTimePeriodRenamedTopics.data.flatten - - val renamedTopics = kafkaDataInChunksWithTimePeriodRenamedTopics.renamedTopics.values.toSet - - val asProducerRecords = toProducerRecords(data) - val baseSource = toSource(asProducerRecords, 30 seconds) - - implicit val config: S3Config = s3Config - implicit val backupConfig: Backup = - Backup(kafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds) - - val backupClientWrapped = - new BackupClientControlWrapper( - new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig), - implicitly, - implicitly, - implicitly, - implicitly - ) - ) - - val restoreClient = - new RestoreClient[KafkaProducer](Some(s3Settings), None) - - val producerSettings = createProducer() - - val calculatedFuture = for { - _ <- createTopics(kafkaDataInChunksWithTimePeriodRenamedTopics.topics) - _ <- createBucket(s3Config.dataBucket) - _ = backupClientWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( - baseSource - .runWith(Producer.plainSink(producerSettings)) - ) - _ <- sendTopicAfterTimePeriod(1 minute, - producerSettings, - kafkaDataInChunksWithTimePeriodRenamedTopics.topics.head - ) - _ <- waitForSingleDownload(s3Config.dataBucket) - restoreResultConsumerTopicSettings = - ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer) - .withBootstrapServers( - container.bootstrapServers - ) - .withGroupId(kafkaConsumerGroup) - restoreResultConsumerSource = - Consumer - .plainSource(restoreResultConsumerTopicSettings, - Subscriptions.topics(kafkaDataInChunksWithTimePeriodRenamedTopics.renamedTopics.values.toSet) - ) - eventualRestoredTopics = restoreResultConsumerSource.toMat(Sink.collection)(DrainingControl.apply).run() - _ <- createTopics(renamedTopics) - _ <- akka.pattern.after(5 seconds)(restoreClient.restore) - receivedTopics <- akka.pattern.after(1 minute)(eventualRestoredTopics.drainAndShutdown()) - asConsumerRecords = receivedTopics.map(KafkaClient.consumerRecordToReducedConsumerRecord) - } yield asConsumerRecords.toList - - calculatedFuture.onComplete { _ => - cleanTopics(kafkaDataInChunksWithTimePeriodRenamedTopics.topics) - cleanTopics(renamedTopics) - backupClientWrapped.shutdown() - } - val restoredConsumerRecords = calculatedFuture.futureValue - - val restoredGroupedAsKey = restoredConsumerRecords - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - val inputAsKey = data - .groupBy(_.key) - .view - .mapValues { reducedConsumerRecords => - reducedConsumerRecords.map(_.value) - } - .toMap - - restoredGroupedAsKey mustMatchTo inputAsKey - } - - } - + with RealS3RestoreClientTest { + override val compression: Option[Compression] = None } diff --git a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientTest.scala b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientTest.scala new file mode 100644 index 00000000..72697b48 --- /dev/null +++ b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientTest.scala @@ -0,0 +1,162 @@ +package io.aiven.guardian.kafka.restore.s3 + +import akka.kafka.ConsumerSettings +import akka.kafka.Subscriptions +import akka.kafka.scaladsl.Consumer +import akka.kafka.scaladsl.Consumer.DrainingControl +import akka.kafka.scaladsl.Producer +import akka.stream.alpakka.s3.S3Settings +import akka.stream.alpakka.s3.scaladsl.S3 +import akka.stream.scaladsl.Sink +import com.softwaremill.diffx.scalatest.DiffMustMatcher._ +import io.aiven.guardian.kafka.Generators._ +import io.aiven.guardian.kafka.KafkaClusterTest +import io.aiven.guardian.kafka.backup.BackupClientControlWrapper +import io.aiven.guardian.kafka.backup.KafkaClient +import io.aiven.guardian.kafka.backup.configs.Backup +import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst +import io.aiven.guardian.kafka.backup.s3.BackupClient +import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig} +import io.aiven.guardian.kafka.restore.KafkaProducer +import io.aiven.guardian.kafka.restore.configs.{Restore => RestoreConfig} +import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen +import io.aiven.guardian.kafka.s3.S3Spec +import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.scalatest.matchers.must.Matchers +import org.scalatest.propspec.AnyPropSpecLike +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ +import scala.language.postfixOps + +trait RealS3RestoreClientTest + extends AnyPropSpecLike + with S3Spec + with Matchers + with KafkaClusterTest + with ScalaCheckPropertyChecks { + + def compression: Option[Compression] + + override lazy val s3Settings: S3Settings = S3Settings() + + def waitForSingleDownload(dataBucket: String): Future[Unit] = waitForS3Download( + dataBucket, + { + case Seq(_) => () + case rest => + throw DownloadNotReady(rest) + } + ) + + /** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail + * on bucket creation + */ + override lazy val useVirtualDotHost: Boolean = false + override lazy val bucketPrefix: Option[String] = Some("guardian-") + override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds) + + property("Round-trip with backup and restore", RealS3Available) { + forAll( + kafkaDataWithMinSizeRenamedTopicsGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), + s3ConfigGen(useVirtualDotHost, bucketPrefix), + kafkaConsumerGroupGen + ) { + (kafkaDataInChunksWithTimePeriodRenamedTopics: KafkaDataInChunksWithTimePeriodRenamedTopics, + s3Config: S3Config, + kafkaConsumerGroup: String + ) => + implicit val restoreConfig: RestoreConfig = + RestoreConfig(None, Some(kafkaDataInChunksWithTimePeriodRenamedTopics.renamedTopics)) + implicit val kafkaProducer: KafkaProducer = new KafkaProducer(configureProducer = baseProducerConfig) + implicit val kafkaClusterConfig: KafkaClusterConfig = + KafkaClusterConfig(kafkaDataInChunksWithTimePeriodRenamedTopics.topics) + + val data = kafkaDataInChunksWithTimePeriodRenamedTopics.data.flatten + + val renamedTopics = kafkaDataInChunksWithTimePeriodRenamedTopics.renamedTopics.values.toSet + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + implicit val config: S3Config = s3Config + implicit val backupConfig: Backup = + Backup(kafkaConsumerGroup, PeriodFromFirst(1 minute), 10 seconds, compression) + + val backupClientWrapped = + new BackupClientControlWrapper( + new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + implicitly, + implicitly + ) + ) + + val restoreClient = + new RestoreClient[KafkaProducer](Some(s3Settings), None) + + val producerSettings = createProducer() + + val calculatedFuture = for { + _ <- createTopics(kafkaDataInChunksWithTimePeriodRenamedTopics.topics) + _ <- createBucket(s3Config.dataBucket) + _ = backupClientWrapped.run() + _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + baseSource + .runWith(Producer.plainSink(producerSettings)) + ) + _ <- sendTopicAfterTimePeriod(1 minute, + producerSettings, + kafkaDataInChunksWithTimePeriodRenamedTopics.topics.head + ) + _ <- waitForSingleDownload(s3Config.dataBucket) + restoreResultConsumerTopicSettings = + ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer) + .withBootstrapServers( + container.bootstrapServers + ) + .withGroupId(kafkaConsumerGroup) + restoreResultConsumerSource = + Consumer + .plainSource(restoreResultConsumerTopicSettings, Subscriptions.topics(renamedTopics)) + eventualRestoredTopics = restoreResultConsumerSource.toMat(Sink.collection)(DrainingControl.apply).run() + _ <- createTopics(renamedTopics) + _ <- akka.pattern.after(5 seconds)(restoreClient.restore) + receivedTopics <- akka.pattern.after(1 minute)(eventualRestoredTopics.drainAndShutdown()) + asConsumerRecords = receivedTopics.map(KafkaClient.consumerRecordToReducedConsumerRecord) + } yield asConsumerRecords.toList + + calculatedFuture.onComplete { _ => + cleanTopics(kafkaDataInChunksWithTimePeriodRenamedTopics.topics) + cleanTopics(renamedTopics) + backupClientWrapped.shutdown() + } + val restoredConsumerRecords = calculatedFuture.futureValue + + val restoredGroupedAsKey = restoredConsumerRecords + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + restoredGroupedAsKey mustMatchTo inputAsKey + } + + } + +}