-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add gzip compression #196
Add gzip compression #196
Conversation
abe487d
to
7b09113
Compare
Pull Request Test Coverage Report for Build 3142309373
💛 - Coveralls |
@@ -113,8 +113,8 @@ lazy val core = project | |||
librarySettings, | |||
name := s"$baseName-core", | |||
libraryDependencies ++= Seq( | |||
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Provided, | |||
"com.typesafe.akka" %% "akka-stream" % akkaVersion % Provided, | |||
"com.typesafe.akka" %% "akka-actor" % akkaVersion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provided
is removed here because otherwise we can't enforce the correct version of akka/akka-streams which we need since its only with akka 2.6.19
and above where we have unsafeDataVia
which is required for this feature (see akka/akka#31123)
29809cd
to
433c7e0
Compare
a235794
to
2d6b852
Compare
1f17b9d
to
d7cc9db
Compare
d7cc9db
to
5770f35
Compare
ad6cced
to
919bc05
Compare
So I have just updated the PR with a lot more work done, basically there is a full implementation of GZip compression as stated in the PR description as well as some added tests to see if it works. Unfortunately I encountered an issue in upstream akka, the |
090378e
to
571c29a
Compare
b348667
to
170d764
Compare
So the gzip compression PR is finally complete, originally PR post has been updated with the details. |
* @return | ||
* A flow that will always produce exactly single output element for a given input element | ||
*/ | ||
private[backup] def flattenFlow[T](flow: Flow[T, T, NotUsed], zero: T, foldFunc: (T, T) => T): Flow[T, T, NotUsed] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The context behind this function is that unsafeDataVia
can only works if you provide a Flow
that strictly never adds or removes elements from the stream (i.e. it can only modify each incoming element), this is the reason behind the "unsafe" in the method name. GZip compression doesn't do this, i.e. it can arbitrarily add or remove elements at will.
So one of the ways to solve this problem is to forcefully materialize the Gzip compression stream (i.e. execute the entire stream at this point in time usingrunFold
) which ensures that for each incoming element, the entire element is gzip stream'ed rather than streaming the ByteString
contents into the gzip compression Flow
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part I didn't get, Compression.gzip
is ByteString
-> ByteString
, how come we need to add or remove elements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So first thing is, even though we are going from ByteString
-> ByteString
we are dealing with a Flow
/Source
which means you have multiple elements of that ByteString
(in other words its easier to think of it as Stream of Many of ByteString
-> Stream of Many of ByteString
). This is what the pure streaming nature of akka-streams implies, you are never dealing with a single strict element but a flow of many elements over time.
Now since we are dealing with a FlowWithContext
we have a pairing of offset to ByteString
, i.e.
Cursor | Data |
---|---|
C1 | ByteString 1 |
C2 | ByteString 2 |
The problem with GZip compression is that it does add or remove elements by design, i.e. it needs to add headers/footers at the start/end of the data stream. So that means that if we naively just use GZip compression the following (simplified) happens, firstly with .unsafeDataVia
you have an intermediate stage that produces this.
Cursor | Data |
---|---|
N/A | Gzipped ByteString Header |
C1 | Gzipped ByteString 1 |
C2 | Gzipped ByteString 2 |
N/A | Gzipped ByteString Footer |
After that .unsafeDataVia
will zip up the C1/C2
elements ignoring ones that don't have a commit causing this
Cursor | Data |
---|---|
C1 | Gzipped ByteString 1 |
C2 | Gzipped ByteString 2 |
Now this is a broken/incorrect GZIp stream since its missing the headers/footers and it throws a Truncated GZIP stream
which is actually exactly what is tested within akka-streams here https://github.com/akka/akka/blob/7abc41cf4e7e8827393b181cd06c5f8ea684e696/akka-stream-tests/src/test/scala/akka/stream/io/compression/GzipWithCustomCompressionLevelSpec.scala#L27-L35.
You can also have a look at the implementation of the GZip compressor at https://github.com/akka/akka/blob/7abc41cf4e7e8827393b181cd06c5f8ea684e696/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala#L24-L26, as you can see it adds a header
and/or a footer
. This is the exact reason why unsafeDataVia
has the name "unsafe" in it, its because it can cause a corrupted stream if you naively put in a Flow
that can add and/or delete elements. akka/akka#31522 has more details.
Hope that explains everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I see, got it now, thanks a lot for explanation
None | ||
} | ||
|
||
@tailrec | ||
def keyToOffsetDateTime(key: String): OffsetDateTime = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This had to be changed to deal with the fact that we now have extensions .json.gz
where as before we only had .json
. The previous implementation would only drop the ending .gz
from the key which means we would still have <ISO_DATE_TIME>.json
(which isn't a valid timestamp).
The nicest way I could find to solve this issue is to just recursively drop ending extensions between the period (.
) until you happen to successfully parse the key as a date.
backupMock.clear() | ||
val calculatedFuture = for { | ||
_ <- backupMock.backup.run() | ||
_ <- akka.pattern.after(10 seconds)(Future.successful(())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This used to be AkkaStreamInitializationConstant
(which is 1 second) however the duration was too short which caused flaky tests specifically for the Gzip version of this spec. I suspect that the gzip compression for the large amounts of data being generated happens to add a fair bit of time to the backup, likely due to the fact that the github actions CI machines are also not that powerful (and GZip compression is CPU intensive) and the fact that this is a mock not a real test which means that the entire Source
contents are being passed in at once rather than slowly over time with a Kafka cluster.
val calculatedFuture = for { | ||
_ <- backupMock.backup.run() | ||
_ <- akka.pattern.after(10 seconds)(Future.successful(())) | ||
processedRecords = backupMock.mergeBackedUpData() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not passing in the compression
argument here because its the job of the actual RestoreClient
to decompress the backed up data later on.
val calculatedFuture = for { | ||
_ <- backupMock.backup.run() | ||
_ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) | ||
processedRecords = backupMock.mergeBackedUpData() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not passing in the compression
argument here because its the job of the actual RestoreClient
to decompress the backed up data later on.
170d764
to
30a68a5
Compare
Note that the tests have passed previously, now running tests is due to rebase of #265 (see https://github.com/aiven/guardian-for-apache-kafka/actions/runs/3142309373) EDIT: Tests have passed |
Definitely pushes my limits of Akka Streams, second pair of eyes would be helpful, but I think it looks good overall. My personal observation - compression seems to be creeping into every place, may be a bit challenging to add different types (fe |
@reta Thanks! The abstractions I added in my PR (one of the reasons why it happens to be a bit bigger than normal) would make adding compressions later on easier however there definitely needs to be some code organization done, i.e. abstracting out the compression in its own classes/traits rather than putting it into |
About this change - What it does
This PR adds GZip compression along with a suite of tests to make sure that the feature works.
Resolves: #195
Why this way
The first thing to note about the GZip compression is that because Guardian is stateless the only way that it knows that something is compressed (or not) is by the key/object/file extension (i.e. it ends with
.gz
). This feature influences how the implementation is done, i.e.ConfigurationChangeRestartSpec
is what tests these changes, i.e. its testing that when you restart theBackupClient
with different settings (in context of this PR that means compression) that the current upload is unchanged and the next upload has the applied settings..gz
theRestoreClient
can easily figure out which files need to be uncompressed and which ones notBackupClientInterfaceSpec
has been moved toBackupClientInterfaceTest
which is atrait
. ThatBackupClientInterfaceTest
trait has a fieldcompression
so its intended that you have a test classes that extend that trait (in this caseBackupClientInterfaceSpec
which is the current/original test andGzipCompressionBackupClientInterfaceSpec
which is the current/original test but with gzip compression enabled). This allows you to easily specify whether you want to run the entire test suite either with compression or not and can easily be extended for future features.RestoreClientInterfaceTest
,RealS3BackupClientTest
andRealS3RestoreClientTest
BackupClientInterfaceTest
and the currentBackupClientInterfaceSpec
into a diff tool (same applies to the other tests)mergeBackedUpData
fromMockedBackupClientInterface
has been updated to deal with the fact that mocks now have to decompress theByteString
data required for certain tests. Acompression
parameter has been added to the method that lets us indicate that we wan't to decompress the data after merging it.BackupObjectMetadata
which contains metadata about a backup. Currently it only contains whether the backup is compressed or not but its intended that this datastructure will be extended in the future for other features. There is alsoBackupObjectMetadata.fromKey
method creates theBackupObjectMetadata
from a keyStateDetails
which replaces the previousState
in order to combine it with aBackupObjectMetadata
.Compression
which is a datastructure that contains aCompressionType
along with any details for the compression (such as a compression level)CompressionType
which is a type of compression (in this case only Gzip).gzip
to toggle it on, if you havegzip
enabled you can also add the optional--compression-level
to specify the level. SeeCliSpec
changes for more info.BackupConfig
.