Skip to content

Commit

Permalink
Add gzip + none compression algos and let SDK pick compression (#155)
Browse files Browse the repository at this point in the history
## Description of Changes

Companion to clockworklabs/SpacetimeDB#1802.

## Requires SpacetimeDB PRs

clockworklabs/SpacetimeDB#1802
  • Loading branch information
Centril authored Oct 15, 2024
1 parent c74b1fd commit f1b8fa8
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 21 deletions.
9 changes: 9 additions & 0 deletions src/Compression.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace SpacetimeDB
{
public enum Compression
{
None,
Brotli,
Gzip,
}
}
3 changes: 2 additions & 1 deletion src/SpacetimeDB/ClientApi/CompressableQueryUpdate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace SpacetimeDB.ClientApi
[SpacetimeDB.Type]
public partial record CompressableQueryUpdate : SpacetimeDB.TaggedEnum<(
SpacetimeDB.ClientApi.QueryUpdate Uncompressed,
byte[] Brotli
byte[] Brotli,
byte[] Gzip
)>;
}
48 changes: 30 additions & 18 deletions src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public sealed class DbConnectionBuilder<DbConnection, Reducer>
string? uri;
string? nameOrAddress;
string? token;
Compression? compression;

public DbConnection Build()
{
Expand All @@ -33,7 +34,7 @@ public DbConnection Build()
{
throw new InvalidOperationException("Building DbConnection with a null nameOrAddress. Call WithModuleName() first.");
}
conn.Connect(token, uri, nameOrAddress);
conn.Connect(token, uri, nameOrAddress, compression ?? Compression.Brotli);
#if UNITY_5_3_OR_NEWER
SpacetimeDBNetworkManager.ActiveConnections.Add(conn);
#endif
Expand All @@ -58,6 +59,12 @@ public DbConnectionBuilder<DbConnection, Reducer> WithCredentials(in (Identity i
return this;
}

public DbConnectionBuilder<DbConnection, Reducer> WithCompression(Compression compression)
{
this.compression = compression;
return this;
}

public DbConnectionBuilder<DbConnection, Reducer> OnConnect(Action<DbConnection, Identity, string> cb)
{
conn.onConnect += (identity, token) => cb.Invoke(conn, identity, token);
Expand Down Expand Up @@ -209,6 +216,17 @@ enum CompressionAlgos : byte
{
None = 0,
Brotli = 1,
Gzip = 2,
}

private static BinaryReader BrotliReader(Stream stream)
{
return new BinaryReader(new BrotliStream(stream, CompressionMode.Decompress));
}

private static BinaryReader GzipReader(Stream stream)
{
return new BinaryReader(new GZipStream(stream, CompressionMode.Decompress));
}

private static ServerMessage DecompressDecodeMessage(byte[] bytes)
Expand All @@ -221,16 +239,11 @@ private static ServerMessage DecompressDecodeMessage(byte[] bytes)
switch (compression)
{
case CompressionAlgos.None:
{
using var binaryReader = new BinaryReader(stream);
return new ServerMessage.BSATN().Read(binaryReader);
}
return new ServerMessage.BSATN().Read(new BinaryReader(stream));
case CompressionAlgos.Brotli:
{
using var decompressedStream = new BrotliStream(stream, CompressionMode.Decompress);
using var binaryReader = new BinaryReader(decompressedStream);
return new ServerMessage.BSATN().Read(binaryReader);
}
return new ServerMessage.BSATN().Read(BrotliReader(stream));
case CompressionAlgos.Gzip:
return new ServerMessage.BSATN().Read(GzipReader(stream));
default:
throw new InvalidOperationException("Unknown compression type");
}
Expand All @@ -244,12 +257,11 @@ private static QueryUpdate DecompressDecodeQueryUpdate(CompressableQueryUpdate u
return qu;

case CompressableQueryUpdate.Brotli(var bytes):
{
using var stream = new MemoryStream(bytes);
using var decompressedStream = new BrotliStream(stream, CompressionMode.Decompress);
using var binaryReader = new BinaryReader(decompressedStream);
return new QueryUpdate.BSATN().Read(binaryReader);
}
return new QueryUpdate.BSATN().Read(BrotliReader(new MemoryStream(bytes)));

case CompressableQueryUpdate.Gzip(var bytes):
return new QueryUpdate.BSATN().Read(GzipReader(new MemoryStream(bytes)));

default:
throw new InvalidOperationException();
}
Expand Down Expand Up @@ -579,7 +591,7 @@ public void Disconnect()
/// </summary>
/// <param name="uri"> URI of the SpacetimeDB server (ex: https://testnet.spacetimedb.com)
/// <param name="addressOrName">The name or address of the database to connect to</param>
internal void Connect(string? token, string uri, string addressOrName)
internal void Connect(string? token, string uri, string addressOrName, Compression compression)
{
isClosing = false;

Expand All @@ -597,7 +609,7 @@ internal void Connect(string? token, string uri, string addressOrName)
{
try
{
await webSocket.Connect(token, uri, addressOrName, Address);
await webSocket.Connect(token, uri, addressOrName, Address, compression);
}
catch (Exception e)
{
Expand Down
4 changes: 2 additions & 2 deletions src/WebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public WebSocket(ConnectOptions options)

public bool IsConnected { get { return Ws != null && Ws.State == WebSocketState.Open; } }

public async Task Connect(string? auth, string host, string nameOrAddress, Address clientAddress)
public async Task Connect(string? auth, string host, string nameOrAddress, Address clientAddress, Compression compression)
{
var url = new Uri($"{host}/database/subscribe/{nameOrAddress}?client_address={clientAddress}");
var url = new Uri($"{host}/database/subscribe/{nameOrAddress}?client_address={clientAddress}&compression={nameof(compression)}");
Ws.Options.AddSubProtocol(_options.Protocol);

var source = new CancellationTokenSource(10000);
Expand Down

0 comments on commit f1b8fa8

Please sign in to comment.