diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala index 6fb388dd..d5c295d4 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala @@ -23,7 +23,7 @@ class MockedS3BackupClientInterface( )(implicit val s3Headers: S3Headers, system: ActorSystem) extends BackupClient(maybeS3Settings)( new MockedKafkaClientInterface(kafkaData), - Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds), + Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds, None), implicitly, s3Config, implicitly diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala index baa55791..f97b68c0 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala @@ -103,7 +103,7 @@ class RealS3BackupClientSpec implicit val config: S3Config = s3Config implicit val backupConfig: Backup = - Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds) + Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds, None) val producerSettings = createProducer() @@ -185,7 +185,7 @@ class RealS3BackupClientSpec implicit val config: S3Config = s3Config implicit val backupConfig: Backup = - Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds) + Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds, None) val producerSettings = createProducer() @@ -321,7 +321,7 @@ class RealS3BackupClientSpec implicit val config: S3Config = s3Config implicit val backupConfig: Backup = - Backup(MockedBackupClientInterface.KafkaGroupId, ChronoUnitSlice(ChronoUnit.MINUTES), 10 seconds) + Backup(MockedBackupClientInterface.KafkaGroupId, ChronoUnitSlice(ChronoUnit.MINUTES), 10 seconds, None) val producerSettings = createProducer() @@ -429,7 +429,7 @@ class RealS3BackupClientSpec implicit val config: S3Config = s3Config implicit val backupConfig: Backup = - Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds) + Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds, None) val backupClient = new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig), implicitly, @@ -517,7 +517,7 @@ class RealS3BackupClientSpec implicit val config: S3Config = s3Config implicit val backupConfig: Backup = - Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds) + Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds, None) val backupClient = new BackupClient(Some(s3Settings))(new MockedKafkaClientInterface(Source(data)), @@ -606,7 +606,7 @@ class RealS3BackupClientSpec val producerSettings = createProducer() val backupClientOne = { - implicit val backupConfig: Backup = Backup("test-1", PeriodFromFirst(1 minute), 10 seconds) + implicit val backupConfig: Backup = Backup("test-1", PeriodFromFirst(1 minute), 10 seconds, None) new BackupClient(Some(s3Settings))( new KafkaClient(configureConsumer = baseKafkaConfig), @@ -618,7 +618,7 @@ class RealS3BackupClientSpec } val backupClientTwo = { - implicit val backupConfig: Backup = Backup("test-2", PeriodFromFirst(1 minute), 10 seconds) + implicit val backupConfig: Backup = Backup("test-2", PeriodFromFirst(1 minute), 10 seconds, None) new BackupClient(Some(s3Settings))( new KafkaClient(configureConsumer = baseKafkaConfig), @@ -733,7 +733,7 @@ class RealS3BackupClientSpec val producerSettings = createProducer() val backupClientOne = { - implicit val backupConfig: Backup = Backup("test-1", PeriodFromFirst(1 second), 10 seconds) + implicit val backupConfig: Backup = Backup("test-1", PeriodFromFirst(1 second), 10 seconds, None) new BackupClient(Some(s3Settings))( new KafkaClient(configureConsumer = baseKafkaConfig), @@ -745,7 +745,7 @@ class RealS3BackupClientSpec } val backupClientTwo = { - implicit val backupConfig: Backup = Backup("test-2", PeriodFromFirst(1 second), 10 seconds) + implicit val backupConfig: Backup = Backup("test-2", PeriodFromFirst(1 second), 10 seconds, None) new BackupClient(Some(s3Settings))( new KafkaClient(configureConsumer = baseKafkaConfig), diff --git a/build.sbt b/build.sbt index f7ef43a1..410668d9 100644 --- a/build.sbt +++ b/build.sbt @@ -116,8 +116,8 @@ lazy val core = project librarySettings, name := s"$baseName-core", libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-actor" % akkaVersion % Provided, - "com.typesafe.akka" %% "akka-stream" % akkaVersion % Provided, + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.typesafe.akka" %% "akka-stream-kafka" % alpakkaKafkaVersion, // Ideally we shouldn't be explicitly providing a kafka-clients version and instead getting the version // transitively from akka-streams-kafka however there isn't a nice way to extract a transitive dependency diff --git a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala index 2925c0c8..23f377b9 100644 --- a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala +++ b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala @@ -8,10 +8,7 @@ import io.aiven.guardian.cli.MainUtils import io.aiven.guardian.cli.arguments.PropertiesOpt._ import io.aiven.guardian.cli.arguments.StorageOpt import io.aiven.guardian.cli.options.Options -import io.aiven.guardian.kafka.backup.configs.Backup -import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice -import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst -import io.aiven.guardian.kafka.backup.configs.TimeConfiguration +import io.aiven.guardian.kafka.backup.configs._ import io.aiven.guardian.kafka.configs.KafkaCluster import io.aiven.guardian.kafka.s3.configs.S3 import org.slf4j.LoggerFactory @@ -56,13 +53,22 @@ class Entry(val initializedApp: AtomicReference[Option[(App[_], Promise[Unit])]] ) .withDefault(10 seconds) + val compressionLevelOpt = + Opts.option[Int]("compression-level", help = "Level of compression to use if enabled").orNone + + val gzipOpt = Opts.subcommand("gzip", help = "Enable gzip compression") { + compressionLevelOpt.map(Gzip) + } + + val compressionOpt = gzipOpt.orNone + val backupOpt = - (groupIdOpt, timeConfigurationOpt, commitTimeoutBufferOpt).tupled.mapValidated { - case (maybeGroupId, maybeTimeConfiguration, commitTimeoutBuffer) => + (groupIdOpt, timeConfigurationOpt, commitTimeoutBufferOpt, compressionOpt).tupled.mapValidated { + case (maybeGroupId, maybeTimeConfiguration, commitTimeoutBuffer, maybeCompression) => import io.aiven.guardian.kafka.backup.Config.backupConfig (maybeGroupId, maybeTimeConfiguration) match { case (Some(groupId), Some(timeConfiguration)) => - Backup(groupId, timeConfiguration, commitTimeoutBuffer).validNel + Backup(groupId, timeConfiguration, commitTimeoutBuffer, maybeCompression).validNel case _ => Options .optionalPureConfigValue(() => backupConfig) diff --git a/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala b/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala index 76c9465a..882e8559 100644 --- a/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala +++ b/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala @@ -3,6 +3,7 @@ package io.aiven.guardian.kafka.backup import akka.actor.ActorSystem import akka.testkit.TestKit import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice +import io.aiven.guardian.kafka.backup.configs.Gzip import io.aiven.guardian.kafka.backup.configs.{Backup => BackupConfig} import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig} import markatta.futiles.CancellableFuture @@ -45,7 +46,8 @@ class CliSpec extends TestKit(ActorSystem("BackupCliSpec")) with AnyPropSpecLike "--chrono-unit-slice", "hours", "--commit-timeout-buffer-window", - "1 second" + "1 second", + "gzip" ) val cancellable = CancellableFuture { @@ -67,7 +69,8 @@ class CliSpec extends TestKit(ActorSystem("BackupCliSpec")) with AnyPropSpecLike case s3App: S3App => s3App.backupConfig mustEqual BackupConfig(groupId, ChronoUnitSlice(ChronoUnit.HOURS), - FiniteDuration(1, TimeUnit.SECONDS) + FiniteDuration(1, TimeUnit.SECONDS), + Some(Gzip(None)) ) s3App.kafkaClusterConfig mustEqual KafkaClusterConfig(Set(topic)) s3App.kafkaClient.consumerSettings.getProperty("bootstrap.servers") mustEqual bootstrapServer diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala index c15b89fc..456e2c30 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala @@ -7,10 +7,7 @@ import akka.stream.scaladsl._ import akka.util.ByteString import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.Errors -import io.aiven.guardian.kafka.backup.configs.Backup -import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice -import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst -import io.aiven.guardian.kafka.backup.configs.TimeConfiguration +import io.aiven.guardian.kafka.backup.configs._ import io.aiven.guardian.kafka.codecs.Circe._ import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.circe.syntax._ @@ -299,6 +296,40 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { private[backup] val terminateSource: Source[ByteString, NotUsed] = Source.single(ByteString("null]")) + private[backup] def compressContextFlow[CtxIn, CtxOut, Mat]( + flowWithContext: FlowWithContext[ByteString, CtxIn, ByteString, CtxOut, Mat] + ) = + backupConfig.compression match { + case Some(compression) => + flowWithContext.unsafeDataVia(compressionLevel(compression)) + case None => flowWithContext + } + + private[backup] def compressContextSink[Ctx, Mat](sink: Sink[(ByteString, Ctx), Mat]) = + backupConfig.compression match { + case Some(compression) => + FlowWithContext[ByteString, Ctx] + .unsafeDataVia(compressionLevel(compression)) + .asFlow + .toMat( + sink + )(Keep.right) + case None => sink + } + + private[backup] def compressSource[Mat](source: Source[ByteString, Mat]) = + backupConfig.compression match { + case Some(compression) => + source.via(compressionLevel(compression)) + case None => + source + } + + private[backup] def compressionLevel(compression: Compression) = compression match { + case Gzip(Some(level)) => Compression.gzip(level) + case Gzip(None) => Compression.gzip + } + /** Prepares the sink before it gets handed to `backupToStorageSink` */ private[backup] def prepareStartOfStream(uploadStateResult: UploadStateResult, @@ -308,47 +339,54 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { case (Some(previous), None) => backupConfig.timeConfiguration match { case _: PeriodFromFirst => - backupToStorageSink(start.key, None) - .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => - (byteString, byteStringContext.context) - } + compressContextSink( + backupToStorageSink(start.key, None) + .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => + (byteString, byteStringContext.context) + } + ) case _: ChronoUnitSlice => logger.warn( s"Detected previous backup using PeriodFromFirst however current configuration is now changed to ChronoUnitSlice. Object/file with an older key: ${start.key} may contain newer events than object/file with newer key: ${previous.previousKey}" ) - backupToStorageSink(start.key, None) - .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => - (byteString, byteStringContext.context) - } + compressContextSink( + backupToStorageSink(start.key, None) + .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => + (byteString, byteStringContext.context) + } + ) } case (None, Some(current)) => backupConfig.timeConfiguration match { case _: PeriodFromFirst => throw Errors.UnhandledStreamCase(List(current)) case _: ChronoUnitSlice => - FlowWithContext - .fromTuples( - Flow[(ByteString, ByteStringContext)] - .flatMapPrefix(1) { - case Seq((byteString, start: Start)) => - val withoutStartOfJsonArray = byteString.drop(1) - Flow[(ByteString, ByteStringContext)].prepend( - Source.single((withoutStartOfJsonArray, start)) - ) - case _ => throw Errors.ExpectedStartOfSource - } - ) - .asFlow + compressContextFlow( + FlowWithContext + .fromTuples( + Flow[(ByteString, ByteStringContext)] + .flatMapPrefix(1) { + case Seq((byteString, start: Start)) => + val withoutStartOfJsonArray = byteString.drop(1) + Flow[(ByteString, ByteStringContext)].prepend( + Source.single((withoutStartOfJsonArray, start)) + ) + case _ => throw Errors.ExpectedStartOfSource + } + ) + ).asFlow .toMat(backupToStorageSink(start.key, Some(current)).contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => (byteString, byteStringContext.context) })(Keep.right) } case (None, None) => - backupToStorageSink(start.key, None) - .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => - (byteString, byteStringContext.context) - } + compressContextSink( + backupToStorageSink(start.key, None) + .contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) => + (byteString, byteStringContext.context) + } + ) case (Some(previous), Some(current)) => throw Errors.UnhandledStreamCase(List(previous.state, current)) } @@ -374,10 +412,16 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { case (Seq(only: Element, End), _) => // This case only occurs when you have a single element in a timeslice. // We have to terminate immediately to create a JSON array with a single element - val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration) + val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, + backupConfig.timeConfiguration, + backupConfig.compression + ) transformFirstElement(only, key, terminate = true) case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) => - val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration) + val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime, + backupConfig.timeConfiguration, + backupConfig.compression + ) val firstSource = transformFirstElement(first, key, terminate = false) val rest = Source.combine( @@ -402,7 +446,10 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { ) case (Seq(only: Element), _) => // This case can also occur when user terminates the stream - val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration) + val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, + backupConfig.timeConfiguration, + backupConfig.compression + ) transformFirstElement(only, key, terminate = false) case (rest, _) => throw Errors.UnhandledStreamCase(rest) @@ -422,7 +469,7 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging { _ = logger.debug(s"Received $uploadStateResult from getCurrentUploadState with key:${start.key}") _ <- (uploadStateResult.previous, uploadStateResult.current) match { case (Some(previous), None) => - terminateSource + compressSource(terminateSource) .runWith(backupToStorageTerminateSink(previous)) .map(Some.apply)(ExecutionContext.parasitic) case _ => Future.successful(None) @@ -464,13 +511,21 @@ object BackupClientInterface { * @return * A `String` that can be used either as some object key or a filename */ - def calculateKey(offsetDateTime: OffsetDateTime, timeConfiguration: TimeConfiguration): String = { + def calculateKey(offsetDateTime: OffsetDateTime, + timeConfiguration: TimeConfiguration, + maybeCompression: Option[Compression] + ): String = { val finalTime = timeConfiguration match { case ChronoUnitSlice(chronoUnit) => offsetDateTime.truncatedTo(chronoUnit) case _ => offsetDateTime } - s"${BackupClientInterface.formatOffsetDateTime(finalTime)}.json" + val extension = maybeCompression match { + case Some(_: Gzip) => "json.gz" + case None => "json" + } + + s"${BackupClientInterface.formatOffsetDateTime(finalTime)}.$extension" } /** Calculates whether we have rolled over a time period given number of divided periods. diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala index 7564db17..306fa681 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala @@ -8,8 +8,11 @@ import scala.concurrent.duration.FiniteDuration * Determines how the backed up objects/files are segregated depending on a time configuration * @param commitTimeoutBufferWindow * A buffer that is added ontop of the `timeConfiguration` when setting the Kafka Consumer commit timeout. + * @param compression + * Which compression to use for the backed up data */ final case class Backup(kafkaGroupId: String, timeConfiguration: TimeConfiguration, - commitTimeoutBufferWindow: FiniteDuration + commitTimeoutBufferWindow: FiniteDuration, + compression: Option[Compression] ) diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Compression.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Compression.scala new file mode 100644 index 00000000..8d2b2a2a --- /dev/null +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Compression.scala @@ -0,0 +1,5 @@ +package io.aiven.guardian.kafka.backup.configs + +sealed trait Compression + +final case class Gzip(level: Option[Int]) extends Compression diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala index 10de920e..5159684e 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala @@ -77,7 +77,8 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka override implicit lazy val backupConfig: Backup = Backup( KafkaGroupId, timeConfiguration, - 10 seconds + 10 seconds, + None ) /** Override this type to define the result of backing up data to a datasource diff --git a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala index c0372798..0f871d19 100644 --- a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala +++ b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala @@ -84,7 +84,7 @@ class RealS3RestoreClientSpec implicit val config: S3Config = s3Config implicit val backupConfig: Backup = - Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds) + Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds, None) val backupClient = new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig),