Skip to content

Commit

Permalink
Add gzip compression
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Apr 21, 2022
1 parent f96ada1 commit 29809cd
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MockedS3BackupClientInterface(
)(implicit val s3Headers: S3Headers, system: ActorSystem)
extends BackupClient(maybeS3Settings)(
new MockedKafkaClientInterface(kafkaData),
Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds),
Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds, None),
implicitly,
s3Config,
implicitly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class RealS3BackupClientSpec

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup =
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds)
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds, None)

val producerSettings = createProducer()

Expand Down Expand Up @@ -183,7 +183,7 @@ class RealS3BackupClientSpec

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup =
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds)
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds, None)

val producerSettings = createProducer()

Expand Down Expand Up @@ -319,7 +319,7 @@ class RealS3BackupClientSpec

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup =
Backup(MockedBackupClientInterface.KafkaGroupId, ChronoUnitSlice(ChronoUnit.MINUTES), 10 seconds)
Backup(MockedBackupClientInterface.KafkaGroupId, ChronoUnitSlice(ChronoUnit.MINUTES), 10 seconds, None)

val producerSettings = createProducer()

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ lazy val core = project
librarySettings,
name := s"$baseName-core",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Provided,
"com.typesafe.akka" %% "akka-stream" % akkaVersion % Provided,
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-stream-kafka" % alpakkaKafkaVersion,
// Ideally we shouldn't be explicitly providing a kafka-clients version and instead getting the version
// transitively from akka-streams-kafka however there isn't a nice way to extract a transitive dependency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ import io.aiven.guardian.cli.MainUtils
import io.aiven.guardian.cli.arguments.PropertiesOpt._
import io.aiven.guardian.cli.arguments.StorageOpt
import io.aiven.guardian.cli.options.Options
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
import io.aiven.guardian.kafka.backup.configs.TimeConfiguration
import io.aiven.guardian.kafka.backup.configs._
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.s3.configs.S3
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -54,13 +51,22 @@ class Entry(val initializedApp: AtomicReference[Option[(App[_], Promise[Unit])]]
)
.withDefault(10 seconds)

val compressionLevelOpt =
Opts.option[Int]("compression-level", help = "Level of compression to use if enabled").orNone

val gzipOpt = Opts.subcommand("gzip", help = "Enable gzip compression") {
compressionLevelOpt.map(Gzip)
}

val compressionOpt = gzipOpt.orNone

val backupOpt =
(groupIdOpt, timeConfigurationOpt, commitTimeoutBufferOpt).tupled.mapValidated {
case (maybeGroupId, maybeTimeConfiguration, commitTimeoutBuffer) =>
(groupIdOpt, timeConfigurationOpt, commitTimeoutBufferOpt, compressionOpt).tupled.mapValidated {
case (maybeGroupId, maybeTimeConfiguration, commitTimeoutBuffer, maybeCompression) =>
import io.aiven.guardian.kafka.backup.Config.backupConfig
(maybeGroupId, maybeTimeConfiguration) match {
case (Some(groupId), Some(timeConfiguration)) =>
Backup(groupId, timeConfiguration, commitTimeoutBuffer).validNel
Backup(groupId, timeConfiguration, commitTimeoutBuffer, maybeCompression).validNel
case _ =>
Options
.optionalPureConfigValue(() => backupConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.aiven.guardian.kafka.backup
import akka.actor.ActorSystem
import akka.testkit.TestKit
import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice
import io.aiven.guardian.kafka.backup.configs.Gzip
import io.aiven.guardian.kafka.backup.configs.{Backup => BackupConfig}
import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig}
import markatta.futiles.CancellableFuture
Expand Down Expand Up @@ -45,7 +46,8 @@ class CliSpec extends TestKit(ActorSystem("BackupCliSpec")) with AnyPropSpecLike
"--chrono-unit-slice",
"hours",
"--commit-timeout-buffer-window",
"1 second"
"1 second",
"gzip"
)

val cancellable = CancellableFuture {
Expand All @@ -67,7 +69,8 @@ class CliSpec extends TestKit(ActorSystem("BackupCliSpec")) with AnyPropSpecLike
case s3App: S3App =>
s3App.backupConfig mustEqual BackupConfig(groupId,
ChronoUnitSlice(ChronoUnit.HOURS),
FiniteDuration(1, TimeUnit.SECONDS)
FiniteDuration(1, TimeUnit.SECONDS),
Some(Gzip(None))
)
s3App.kafkaClusterConfig mustEqual KafkaClusterConfig(Set(topic))
s3App.kafkaClient.consumerSettings.getProperty("bootstrap.servers") mustEqual bootstrapServer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.scalalogging.LazyLogging
import io.aiven.guardian.kafka.Errors
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
import io.aiven.guardian.kafka.backup.configs.TimeConfiguration
import io.aiven.guardian.kafka.backup.configs._
import io.aiven.guardian.kafka.codecs.Circe._
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.circe.syntax._
Expand Down Expand Up @@ -297,6 +294,40 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging {
private[backup] val terminateSource: Source[ByteString, NotUsed] =
Source.single(ByteString("null]"))

private[backup] def compressContextFlow[CtxIn, CtxOut, Mat](
flowWithContext: FlowWithContext[ByteString, CtxIn, ByteString, CtxOut, Mat]
) =
backupConfig.compression match {
case Some(compression) =>
flowWithContext.unsafeDataVia(compressionLevel(compression))
case None => flowWithContext
}

private[backup] def compressContextSink[Ctx, Mat](sink: Sink[(ByteString, Ctx), Mat]) =
backupConfig.compression match {
case Some(compression) =>
FlowWithContext[ByteString, Ctx]
.unsafeDataVia(compressionLevel(compression))
.asFlow
.toMat(
sink
)(Keep.right)
case None => sink
}

private[backup] def compressSource[Mat](source: Source[ByteString, Mat]) =
backupConfig.compression match {
case Some(compression) =>
source.via(compressionLevel(compression))
case None =>
source
}

private[backup] def compressionLevel(compression: Compression) = compression match {
case Gzip(Some(level)) => Compression.gzip(level)
case Gzip(None) => Compression.gzip
}

/** Prepares the sink before it gets handed to `backupToStorageSink`
*/
private[backup] def prepareStartOfStream(uploadStateResult: UploadStateResult,
Expand All @@ -306,47 +337,54 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging {
case (Some(previous), None) =>
backupConfig.timeConfiguration match {
case _: PeriodFromFirst =>
backupToStorageSink(start.key, None)
.contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) =>
(byteString, byteStringContext.context)
}
compressContextSink(
backupToStorageSink(start.key, None)
.contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) =>
(byteString, byteStringContext.context)
}
)
case _: ChronoUnitSlice =>
logger.warn(
s"Detected previous backup using PeriodFromFirst however current configuration is now changed to ChronoUnitSlice. Object/file with an older key: ${start.key} may contain newer events than object/file with newer key: ${previous.previousKey}"
)
backupToStorageSink(start.key, None)
.contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) =>
(byteString, byteStringContext.context)
}
compressContextSink(
backupToStorageSink(start.key, None)
.contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) =>
(byteString, byteStringContext.context)
}
)
}
case (None, Some(current)) =>
backupConfig.timeConfiguration match {
case _: PeriodFromFirst =>
throw Errors.UnhandledStreamCase(List(current))
case _: ChronoUnitSlice =>
FlowWithContext
.fromTuples(
Flow[(ByteString, ByteStringContext)]
.flatMapPrefix(1) {
case Seq((byteString, start: Start)) =>
val withoutStartOfJsonArray = byteString.drop(1)
Flow[(ByteString, ByteStringContext)].prepend(
Source.single((withoutStartOfJsonArray, start))
)
case _ => throw Errors.ExpectedStartOfSource
}
)
.asFlow
compressContextFlow(
FlowWithContext
.fromTuples(
Flow[(ByteString, ByteStringContext)]
.flatMapPrefix(1) {
case Seq((byteString, start: Start)) =>
val withoutStartOfJsonArray = byteString.drop(1)
Flow[(ByteString, ByteStringContext)].prepend(
Source.single((withoutStartOfJsonArray, start))
)
case _ => throw Errors.ExpectedStartOfSource
}
)
).asFlow
.toMat(backupToStorageSink(start.key, Some(current)).contramap[(ByteString, ByteStringContext)] {
case (byteString, byteStringContext) =>
(byteString, byteStringContext.context)
})(Keep.right)
}
case (None, None) =>
backupToStorageSink(start.key, None)
.contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) =>
(byteString, byteStringContext.context)
}
compressContextSink(
backupToStorageSink(start.key, None)
.contramap[(ByteString, ByteStringContext)] { case (byteString, byteStringContext) =>
(byteString, byteStringContext.context)
}
)
case (Some(previous), Some(current)) =>
throw Errors.UnhandledStreamCase(List(previous.state, current))
}
Expand Down Expand Up @@ -374,10 +412,16 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging {
case (Seq(only: Element, End), _) =>
// This case only occurs when you have a single element in a timeslice.
// We have to terminate immediately to create a JSON array with a single element
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration)
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime,
backupConfig.timeConfiguration,
backupConfig.compression
)
transformFirstElement(only, key, terminate = true)
case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) =>
val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration)
val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime,
backupConfig.timeConfiguration,
backupConfig.compression
)
val firstSource = transformFirstElement(first, key, terminate = false)

val rest = Source.combine(
Expand All @@ -402,7 +446,10 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging {
)
case (Seq(only: Element), _) =>
// This case can also occur when user terminates the stream
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration)
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime,
backupConfig.timeConfiguration,
backupConfig.compression
)
transformFirstElement(only, key, terminate = false)
case (rest, _) =>
throw Errors.UnhandledStreamCase(rest)
Expand All @@ -421,7 +468,7 @@ trait BackupClientInterface[T <: KafkaClientInterface] extends LazyLogging {
_ = logger.debug(s"Received $uploadStateResult from getCurrentUploadState with key:${start.key}")
_ <- (uploadStateResult.previous, uploadStateResult.current) match {
case (Some(previous), None) =>
terminateSource.runWith(backupToStorageTerminateSink(previous)).map(Some.apply)
compressSource(terminateSource).runWith(backupToStorageTerminateSink(previous)).map(Some.apply)
case _ => Future.successful(None)
}
} yield prepareStartOfStream(uploadStateResult, start)
Expand All @@ -448,13 +495,21 @@ object BackupClientInterface {
* @return
* A `String` that can be used either as some object key or a filename
*/
def calculateKey(offsetDateTime: OffsetDateTime, timeConfiguration: TimeConfiguration): String = {
def calculateKey(offsetDateTime: OffsetDateTime,
timeConfiguration: TimeConfiguration,
maybeCompression: Option[Compression]
): String = {
val finalTime = timeConfiguration match {
case ChronoUnitSlice(chronoUnit) => offsetDateTime.truncatedTo(chronoUnit)
case _ => offsetDateTime
}

s"${BackupClientInterface.formatOffsetDateTime(finalTime)}.json"
val extension = maybeCompression match {
case Some(_: Gzip) => "json.gz"
case None => "json"
}

s"${BackupClientInterface.formatOffsetDateTime(finalTime)}.$extension"
}

/** Calculates whether we have rolled over a time period given number of divided periods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import scala.concurrent.duration.FiniteDuration
* Determines how the backed up objects/files are segregated depending on a time configuration
* @param commitTimeoutBufferWindow
* A buffer that is added ontop of the `timeConfiguration` when setting the Kafka Consumer commit timeout.
* @param compression
* Which compression to use for the backed up data
*/
final case class Backup(kafkaGroupId: String,
timeConfiguration: TimeConfiguration,
commitTimeoutBufferWindow: FiniteDuration
commitTimeoutBufferWindow: FiniteDuration,
compression: Option[Compression]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.aiven.guardian.kafka.backup.configs

sealed trait Compression

final case class Gzip(level: Option[Int]) extends Compression
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka
override implicit lazy val backupConfig: Backup = Backup(
KafkaGroupId,
timeConfiguration,
10 seconds
10 seconds,
None
)

/** Override this type to define the result of backing up data to a datasource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class RealS3RestoreClientSpec

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup =
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds)
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute), 10 seconds, None)

val backupClient =
new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig),
Expand Down

0 comments on commit 29809cd

Please sign in to comment.