diff --git a/CHANGELOG.md b/CHANGELOG.md index 102a66e..9a39626 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added -- `EncodeWithTryDeflate/EncodeWithoutCompression`: Maps `ReadOnlyMemory` bodies to `int * ReadOnlyMemory` with a non-zero value indicating compression was applied [#78](https://github.com/jet/FsCodec/pull/78) +- `EncodeWithTryDeflate/EncodeUncompressed`: Maps `ReadOnlyMemory` bodies to `int * ReadOnlyMemory` (with a non-zero value indicating compression was applied) [#80](https://github.com/jet/FsCodec/pull/80) ### Changed diff --git a/src/FsCodec.Box/FsCodec.Box.fsproj b/src/FsCodec.Box/FsCodec.Box.fsproj index 0971b8d..52abe8b 100644 --- a/src/FsCodec.Box/FsCodec.Box.fsproj +++ b/src/FsCodec.Box/FsCodec.Box.fsproj @@ -9,6 +9,7 @@ + diff --git a/src/FsCodec.Box/TryDeflate.fs b/src/FsCodec.Box/TryDeflate.fs new file mode 100644 index 0000000..07da048 --- /dev/null +++ b/src/FsCodec.Box/TryDeflate.fs @@ -0,0 +1,75 @@ +namespace FsCodec + +open System +open System.Runtime.CompilerServices +open System.Runtime.InteropServices + +module private MaybeDeflatedBody = + + type Encoding = + | Direct = 0 + | Deflate = 1 + type Encoded = (struct (int * ReadOnlyMemory)) + let empty : Encoded = int Encoding.Direct, ReadOnlyMemory.Empty + + (* EncodedBody can potentially hold compressed content, that we'll inflate on demand *) + + let private inflate (data : ReadOnlyMemory) : byte array = + let s = new System.IO.MemoryStream(data.ToArray(), writable = false) + let decompressor = new System.IO.Compression.DeflateStream(s, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true) + let output = new System.IO.MemoryStream() + decompressor.CopyTo(output) + output.ToArray() + let decode struct (encoding, data) : ReadOnlyMemory = + if encoding = int Encoding.Deflate then inflate data |> ReadOnlyMemory + else data + + (* Compression is conditional on the input meeting a minimum size, and the result meeting a required gain *) + + let private deflate (eventBody : ReadOnlyMemory) : System.IO.MemoryStream = + let output = new System.IO.MemoryStream() + let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) + compressor.Write(eventBody.Span) + compressor.Flush() + output + let private encodeUncompressed (raw : ReadOnlyMemory) : Encoded = 0, raw + let encode minSize minGain (raw : ReadOnlyMemory) : Encoded = + if raw.Length < minSize then encodeUncompressed raw + else match deflate raw with + | tmp when raw.Length > int tmp.Length + minGain -> int Encoding.Deflate, tmp.ToArray() |> ReadOnlyMemory + | _ -> encodeUncompressed raw + +type [] CompressionOptions = { minSize : int; minGain : int } with + /// Attempt to compress anything possible + // TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update + // For DynamoStore, every time we need to calve from the tip, the RU impact of using TransactWriteItems is significant, + // so preventing or delaying that is of critical significance + // Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default + static member Default = { minSize = 48; minGain = 4 } + /// Encode the data without attempting to compress, regardless of size + static member Uncompressed = { minSize = Int32.MaxValue; minGain = 0 } + +[] +type DeflateHelpers = + + static member Utf8ToMaybeDeflateEncoded options (x : ReadOnlyMemory) : struct (int * ReadOnlyMemory) = + MaybeDeflatedBody.encode options.minSize options.minGain x + + static member EncodedToUtf8(x) : ReadOnlyMemory = + MaybeDeflatedBody.decode x + + /// Adapts an IEventCodec rendering to ReadOnlyMemory Event Bodies to attempt to compress the UTF-8 data.
+ /// If sufficient compression, as defined by options is not achieved, the body is saved as-is.
+ /// The int conveys a flag indicating whether compression was applied.
+ [] + static member EncodeWithTryDeflate<'Event, 'Context>(native : IEventCodec<'Event, ReadOnlyMemory, 'Context>, [] ?options) + : IEventCodec<'Event, struct (int * ReadOnlyMemory), 'Context> = + let opts = defaultArg options CompressionOptions.Default + FsCodec.Core.EventCodec.Map(native, DeflateHelpers.Utf8ToMaybeDeflateEncoded opts, DeflateHelpers.EncodedToUtf8) + + /// Adapts an IEventCodec rendering to ReadOnlyMemory Event Bodies to encode as per EncodeWithTryDeflate, but without attempting compression.
+ [] + static member EncodeUncompressed<'Event, 'Context>(native : IEventCodec<'Event, ReadOnlyMemory, 'Context>) + : IEventCodec<'Event, struct (int * ReadOnlyMemory), 'Context> = + let nullOpts = CompressionOptions.Uncompressed + DeflateHelpers.EncodeWithTryDeflate(native, nullOpts) diff --git a/src/FsCodec/Codec.fs b/src/FsCodec/Codec.fs index 2932929..59fff35 100755 --- a/src/FsCodec/Codec.fs +++ b/src/FsCodec/Codec.fs @@ -11,10 +11,10 @@ type Codec = // employed in the convention-based Codec // (IME, while many systems have some code touching the metadata, it's not something one typically wants to encourage) static member private Create<'Event, 'Format, 'Context> - ( /// Maps an 'Event to: an Event Type Name, a pair of 'Format's representing the Data and Meta together with the - /// eventId, correlationId, causationId and timestamp. + ( // Maps an 'Event to: an Event Type Name, a pair of 'Format's representing the Data and Meta together with the + // eventId, correlationId, causationId and timestamp. encode : 'Context option * 'Event -> string * 'Format * 'Format * Guid * string * string * DateTimeOffset option, - /// Attempts to map from an Event's stored data to Some 'Event, or None if not mappable. + // Attempts to map from an Event's stored data to Some 'Event, or None if not mappable. tryDecode : ITimelineEvent<'Format> -> 'Event option) : IEventCodec<'Event, 'Format, 'Context> = @@ -29,15 +29,15 @@ type Codec = /// Generate an IEventCodec suitable using the supplied encode and tryDecode functions to map to/from the stored form. /// mapCausation provides metadata generation and correlation/causationId mapping based on the context passed to the encoder static member Create<'Event, 'Format, 'Context> - ( /// Maps a fresh 'Event resulting from a Decision in the Domain representation type down to the TypeShape UnionConverter 'Contract - /// The function is also expected to derive - /// a meta object that will be serialized with the same settings (if it's not None) - /// and an Event Creation timestamp. + ( // Maps a fresh 'Event resulting from a Decision in the Domain representation type down to the TypeShape UnionConverter 'Contract + // The function is also expected to derive + // a meta object that will be serialized with the same settings (if it's not None) + // and an Event Creation timestamp. encode : 'Event -> string * 'Format * DateTimeOffset option, - /// Maps from the TypeShape UnionConverter 'Contract case the Event has been mapped to (with the raw event data as context) - /// to the 'Event representation (typically a Discriminated Union) that is to be presented to the programming model. + // Maps from the TypeShape UnionConverter 'Contract case the Event has been mapped to (with the raw event data as context) + // to the 'Event representation (typically a Discriminated Union) that is to be presented to the programming model. tryDecode : ITimelineEvent<'Format> -> 'Event option, - /// Uses the 'Context passed to the Encode call and the 'Meta emitted by down to a) the final metadata b) the correlationId and c) the correlationId + // Uses the 'Context passed to the Encode call and the 'Meta emitted by down to a) the final metadata b) the correlationId and c) the correlationId mapCausation : 'Context option * 'Event -> 'Format * Guid * string * string) : IEventCodec<'Event, 'Format, 'Context> = @@ -49,9 +49,9 @@ type Codec = /// Generate an IEventCodec using the supplied pair of encode and tryDecode functions. static member Create<'Event, 'Format> - ( /// Maps a 'Event to an Event Type Name and a UTF-8 array representing the Data. + ( // Maps a 'Event to an Event Type Name and a UTF-8 array representing the Data. encode : 'Event -> string * 'Format, - /// Attempts to map an Event Type Name and a UTF-8 array Data to Some 'Event case, or None if not mappable. + // Attempts to map an Event Type Name and a UTF-8 array Data to Some 'Event case, or None if not mappable. tryDecode : string * 'Format -> 'Event option) : IEventCodec<'Event, 'Format, obj> = diff --git a/tests/FsCodec.Tests/FsCodec.Tests.fsproj b/tests/FsCodec.Tests/FsCodec.Tests.fsproj index 81c21c9..db5a8f9 100644 --- a/tests/FsCodec.Tests/FsCodec.Tests.fsproj +++ b/tests/FsCodec.Tests/FsCodec.Tests.fsproj @@ -7,6 +7,7 @@ + diff --git a/tests/FsCodec.Tests/TryDeflateTests.fs b/tests/FsCodec.Tests/TryDeflateTests.fs new file mode 100644 index 0000000..49e12ec --- /dev/null +++ b/tests/FsCodec.Tests/TryDeflateTests.fs @@ -0,0 +1,68 @@ +module FsCodec.Core.Tests.DeflateTests + +open System +open Swensen.Unquote +open Xunit + +let inline roundtrip (sut : FsCodec.IEventCodec<_, _, _>) value = + let encoded = sut.Encode(context = None, value = value) + let loaded = FsCodec.Core.TimelineEvent.Create(-1L, encoded.EventType, encoded.Data) + sut.TryDecode loaded + +(* Base Fixture Round-trips a String encoded as ReadOnlyMemory UTF-8 blob *) + +module StringUtf8 = + + let eventType = "n/a" + let enc (s : string) : ReadOnlyMemory = System.Text.Encoding.UTF8.GetBytes s |> ReadOnlyMemory + let dec (b : ReadOnlySpan) : string = System.Text.Encoding.UTF8.GetString b + let stringUtf8Encoder = + let encode e = eventType, enc e + let tryDecode (s, b : ReadOnlyMemory) = if s = eventType then Some (dec b.Span) else invalidOp "Invalid eventType value" + FsCodec.Codec.Create(encode, tryDecode) + + let sut = stringUtf8Encoder + + let [] roundtrips () = + let value = "TestValue" + let res' = roundtrip sut value + res' =! Some value + +module WithTryDeflate = + + let sut = FsCodec.DeflateHelpers.EncodeWithTryDeflate(StringUtf8.sut) + + let compressibleValue = String('x', 5000) + + let [] roundtrips () = + let res' = roundtrip sut compressibleValue + res' =! Some compressibleValue + + let [] ``compresses when possible`` () = + let encoded = sut.Encode(context = None, value = compressibleValue) + let struct (encoding, encodedValue) = encoded.Data + encodedValue.Length ] ``uses raw value where compression not possible`` () = + let value = "NotCompressible" + let directResult = StringUtf8.sut.Encode(None, value).Data + let encoded = sut.Encode(context = None, value = value) + let struct (_encoding, result) = encoded.Data + true =! directResult.Span.SequenceEqual(result.Span) + +module WithoutCompression = + + let sut = FsCodec.DeflateHelpers.EncodeUncompressed(StringUtf8.sut) + + // Borrow a demonstrably compressible value + let value = WithTryDeflate.compressibleValue + + let [] roundtrips () = + let res' = roundtrip sut value + res' =! Some value + + let [] ``does not compress, even if it was possible to`` () = + let directResult = StringUtf8.sut.Encode(None, value).Data + let encoded = sut.Encode(context = None, value = value) + let struct (_encoding, result) = encoded.Data + true =! directResult.Span.SequenceEqual(result.Span)