Skip to content

Commit

Permalink
Add WithTryDeflate Encoding (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Jul 4, 2022
1 parent c5dd994 commit cb33d6d
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 13 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- `EncodeWithTryDeflate/EncodeWithoutCompression`: Maps `ReadOnlyMemory<byte>` bodies to `int * ReadOnlyMemory<byte>` with a non-zero value indicating compression was applied [#78](https://github.com/jet/FsCodec/pull/78)
- `EncodeWithTryDeflate/EncodeUncompressed`: Maps `ReadOnlyMemory<byte>` bodies to `int * ReadOnlyMemory<byte>` (with a non-zero value indicating compression was applied) [#80](https://github.com/jet/FsCodec/pull/80)

### Changed

Expand Down
1 change: 1 addition & 0 deletions src/FsCodec.Box/FsCodec.Box.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<Compile Include="CoreCodec.fs" />
<Compile Include="Codec.fs" />
<Compile Include="Interop.fs" />
<Compile Include="TryDeflate.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
75 changes: 75 additions & 0 deletions src/FsCodec.Box/TryDeflate.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
namespace FsCodec

open System
open System.Runtime.CompilerServices
open System.Runtime.InteropServices

module private MaybeDeflatedBody =

type Encoding =
| Direct = 0
| Deflate = 1
type Encoded = (struct (int * ReadOnlyMemory<byte>))
let empty : Encoded = int Encoding.Direct, ReadOnlyMemory.Empty

(* EncodedBody can potentially hold compressed content, that we'll inflate on demand *)

let private inflate (data : ReadOnlyMemory<byte>) : byte array =
let s = new System.IO.MemoryStream(data.ToArray(), writable = false)
let decompressor = new System.IO.Compression.DeflateStream(s, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true)
let output = new System.IO.MemoryStream()
decompressor.CopyTo(output)
output.ToArray()
let decode struct (encoding, data) : ReadOnlyMemory<byte> =
if encoding = int Encoding.Deflate then inflate data |> ReadOnlyMemory
else data

(* Compression is conditional on the input meeting a minimum size, and the result meeting a required gain *)

let private deflate (eventBody : ReadOnlyMemory<byte>) : System.IO.MemoryStream =
let output = new System.IO.MemoryStream()
let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write(eventBody.Span)
compressor.Flush()
output
let private encodeUncompressed (raw : ReadOnlyMemory<byte>) : Encoded = 0, raw
let encode minSize minGain (raw : ReadOnlyMemory<byte>) : Encoded =
if raw.Length < minSize then encodeUncompressed raw
else match deflate raw with
| tmp when raw.Length > int tmp.Length + minGain -> int Encoding.Deflate, tmp.ToArray() |> ReadOnlyMemory
| _ -> encodeUncompressed raw

type [<Struct>] CompressionOptions = { minSize : int; minGain : int } with
/// Attempt to compress anything possible
// TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update
// For DynamoStore, every time we need to calve from the tip, the RU impact of using TransactWriteItems is significant,
// so preventing or delaying that is of critical significance
// Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default
static member Default = { minSize = 48; minGain = 4 }
/// Encode the data without attempting to compress, regardless of size
static member Uncompressed = { minSize = Int32.MaxValue; minGain = 0 }

[<Extension>]
type DeflateHelpers =

static member Utf8ToMaybeDeflateEncoded options (x : ReadOnlyMemory<byte>) : struct (int * ReadOnlyMemory<byte>) =
MaybeDeflatedBody.encode options.minSize options.minGain x

static member EncodedToUtf8(x) : ReadOnlyMemory<byte> =
MaybeDeflatedBody.decode x

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>ReadOnlyMemory<byte></c> Event Bodies to attempt to compress the UTF-8 data.<br/>
/// If sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> conveys a flag indicating whether compression was applied.</summary>
[<Extension>]
static member EncodeWithTryDeflate<'Event, 'Context>(native : IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>, [<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, struct (int * ReadOnlyMemory<byte>), 'Context> =
let opts = defaultArg options CompressionOptions.Default
FsCodec.Core.EventCodec.Map(native, DeflateHelpers.Utf8ToMaybeDeflateEncoded opts, DeflateHelpers.EncodedToUtf8)

/// Adapts an <c>IEventCodec</c> rendering to <c>ReadOnlyMemory<byte></c> Event Bodies to encode as per <c>EncodeWithTryDeflate</c>, but without attempting compression.<br/>
[<Extension>]
static member EncodeUncompressed<'Event, 'Context>(native : IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>)
: IEventCodec<'Event, struct (int * ReadOnlyMemory<byte>), 'Context> =
let nullOpts = CompressionOptions.Uncompressed
DeflateHelpers.EncodeWithTryDeflate(native, nullOpts)
24 changes: 12 additions & 12 deletions src/FsCodec/Codec.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ type Codec =
// employed in the convention-based Codec
// (IME, while many systems have some code touching the metadata, it's not something one typically wants to encourage)
static member private Create<'Event, 'Format, 'Context>
( /// <summary>Maps an 'Event to: an Event Type Name, a pair of <c>'Format</c>'s representing the <c>Data</c> and <c>Meta</c> together with the
/// <c>eventId</c>, <c>correlationId</c>, <c>causationId</c> and <c>timestamp</c>.</summary>
( // <summary>Maps an 'Event to: an Event Type Name, a pair of <c>'Format</c>'s representing the <c>Data</c> and <c>Meta</c> together with the
// <c>eventId</c>, <c>correlationId</c>, <c>causationId</c> and <c>timestamp</c>.</summary>
encode : 'Context option * 'Event -> string * 'Format * 'Format * Guid * string * string * DateTimeOffset option,
/// <summary>Attempts to map from an Event's stored data to <c>Some 'Event</c>, or <c>None</c> if not mappable.</summary>
// <summary>Attempts to map from an Event's stored data to <c>Some 'Event</c>, or <c>None</c> if not mappable.</summary>
tryDecode : ITimelineEvent<'Format> -> 'Event option)
: IEventCodec<'Event, 'Format, 'Context> =

Expand All @@ -29,15 +29,15 @@ type Codec =
/// <summary>Generate an <c>IEventCodec</c> suitable using the supplied <c>encode</c> and <c>tryDecode</c> functions to map to/from the stored form.
/// <c>mapCausation</c> provides metadata generation and correlation/causationId mapping based on the <c>context</c> passed to the encoder</summary>
static member Create<'Event, 'Format, 'Context>
( /// Maps a fresh <c>'Event</c> resulting from a Decision in the Domain representation type down to the TypeShape <c>UnionConverter</c> <c>'Contract</c>
/// The function is also expected to derive
/// a <c>meta</c> object that will be serialized with the same settings (if it's not <c>None</c>)
/// and an Event Creation <c>timestamp</c>.
( // Maps a fresh <c>'Event</c> resulting from a Decision in the Domain representation type down to the TypeShape <c>UnionConverter</c> <c>'Contract</c>
// The function is also expected to derive
// a <c>meta</c> object that will be serialized with the same settings (if it's not <c>None</c>)
// and an Event Creation <c>timestamp</c>.
encode : 'Event -> string * 'Format * DateTimeOffset option,
/// Maps from the TypeShape <c>UnionConverter</c> <c>'Contract</c> case the Event has been mapped to (with the raw event data as context)
/// to the <c>'Event</c> representation (typically a Discriminated Union) that is to be presented to the programming model.
// Maps from the TypeShape <c>UnionConverter</c> <c>'Contract</c> case the Event has been mapped to (with the raw event data as context)
// to the <c>'Event</c> representation (typically a Discriminated Union) that is to be presented to the programming model.
tryDecode : ITimelineEvent<'Format> -> 'Event option,
/// Uses the 'Context passed to the Encode call and the 'Meta emitted by <c>down</c> to a) the final metadata b) the <c>correlationId</c> and c) the correlationId
// Uses the 'Context passed to the Encode call and the 'Meta emitted by <c>down</c> to a) the final metadata b) the <c>correlationId</c> and c) the correlationId
mapCausation : 'Context option * 'Event -> 'Format * Guid * string * string)
: IEventCodec<'Event, 'Format, 'Context> =

Expand All @@ -49,9 +49,9 @@ type Codec =

/// Generate an <code>IEventCodec</code> using the supplied pair of <c>encode</c> and <c>tryDecode</code> functions.
static member Create<'Event, 'Format>
( /// Maps a <c>'Event</c> to an Event Type Name and a UTF-8 array representing the <c>Data</c>.
( // Maps a <c>'Event</c> to an Event Type Name and a UTF-8 array representing the <c>Data</c>.
encode : 'Event -> string * 'Format,
/// Attempts to map an Event Type Name and a UTF-8 array <c>Data</c> to <c>Some 'Event</c> case, or <c>None</c> if not mappable.
// Attempts to map an Event Type Name and a UTF-8 array <c>Data</c> to <c>Some 'Event</c> case, or <c>None</c> if not mappable.
tryDecode : string * 'Format -> 'Event option)
: IEventCodec<'Event, 'Format, obj> =

Expand Down
1 change: 1 addition & 0 deletions tests/FsCodec.Tests/FsCodec.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

<ItemGroup>
<Compile Include="StreamNameTests.fs" />
<Compile Include="TryDeflateTests.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
68 changes: 68 additions & 0 deletions tests/FsCodec.Tests/TryDeflateTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
module FsCodec.Core.Tests.DeflateTests

open System
open Swensen.Unquote
open Xunit

let inline roundtrip (sut : FsCodec.IEventCodec<_, _, _>) value =
let encoded = sut.Encode(context = None, value = value)
let loaded = FsCodec.Core.TimelineEvent.Create(-1L, encoded.EventType, encoded.Data)
sut.TryDecode loaded

(* Base Fixture Round-trips a String encoded as ReadOnlyMemory<byte> UTF-8 blob *)

module StringUtf8 =

let eventType = "n/a"
let enc (s : string) : ReadOnlyMemory<byte> = System.Text.Encoding.UTF8.GetBytes s |> ReadOnlyMemory
let dec (b : ReadOnlySpan<byte>) : string = System.Text.Encoding.UTF8.GetString b
let stringUtf8Encoder =
let encode e = eventType, enc e
let tryDecode (s, b : ReadOnlyMemory<byte>) = if s = eventType then Some (dec b.Span) else invalidOp "Invalid eventType value"
FsCodec.Codec.Create(encode, tryDecode)

let sut = stringUtf8Encoder

let [<Fact>] roundtrips () =
let value = "TestValue"
let res' = roundtrip sut value
res' =! Some value

module WithTryDeflate =

let sut = FsCodec.DeflateHelpers.EncodeWithTryDeflate(StringUtf8.sut)

let compressibleValue = String('x', 5000)

let [<Fact>] roundtrips () =
let res' = roundtrip sut compressibleValue
res' =! Some compressibleValue

let [<Fact>] ``compresses when possible`` () =
let encoded = sut.Encode(context = None, value = compressibleValue)
let struct (encoding, encodedValue) = encoded.Data
encodedValue.Length <! compressibleValue.Length

let [<Fact>] ``uses raw value where compression not possible`` () =
let value = "NotCompressible"
let directResult = StringUtf8.sut.Encode(None, value).Data
let encoded = sut.Encode(context = None, value = value)
let struct (_encoding, result) = encoded.Data
true =! directResult.Span.SequenceEqual(result.Span)

module WithoutCompression =

let sut = FsCodec.DeflateHelpers.EncodeUncompressed(StringUtf8.sut)

// Borrow a demonstrably compressible value
let value = WithTryDeflate.compressibleValue

let [<Fact>] roundtrips () =
let res' = roundtrip sut value
res' =! Some value

let [<Fact>] ``does not compress, even if it was possible to`` () =
let directResult = StringUtf8.sut.Encode(None, value).Data
let encoded = sut.Encode(context = None, value = value)
let struct (_encoding, result) = encoded.Data
true =! directResult.Span.SequenceEqual(result.Span)

0 comments on commit cb33d6d

Please sign in to comment.