Skip to content

Commit

Permalink
Improving backplane messaging. Reduced message size when sending mult…
Browse files Browse the repository at this point in the history
…iple messages in bulks and improved de/serialization of the messages by 6-10x times
  • Loading branch information
MichaCo committed Jul 2, 2017
1 parent 7db3950 commit f4846d9
Show file tree
Hide file tree
Showing 24 changed files with 1,367 additions and 233 deletions.
412 changes: 314 additions & 98 deletions src/CacheManager.Core/Internal/BackplaneMessage.cs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/CacheManager.Core/Utility/ObjectPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public ObjectPool(IObjectPoolPolicy<T> policy, int? maxItems = null)
throw new ArgumentNullException(nameof(policy));
}

if (maxItems == null || maxItems == 0)
if (maxItems == null || maxItems <= 0)
{
maxItems = Environment.ProcessorCount * 2;
}
Expand Down
43 changes: 21 additions & 22 deletions src/CacheManager.StackExchange.Redis/RedisCacheBackplane.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using CacheManager.Core;
Expand All @@ -27,13 +28,14 @@ namespace CacheManager.Redis
public sealed class RedisCacheBackplane : CacheBackplane
{
private readonly string _channelName;
private readonly string _identifier;
private readonly byte[] _identifier;
private readonly ILogger _logger;
private readonly RedisConnectionManager _connection;
private HashSet<string> _messages = new HashSet<string>();
private HashSet<BackplaneMessage> _messages = new HashSet<BackplaneMessage>();
private object _messageLock = new object();
private int _skippedMessages = 0;
private bool _sending = false;
private CancellationTokenSource _source = new CancellationTokenSource();

/// <summary>
/// Initializes a new instance of the <see cref="RedisCacheBackplane"/> class.
Expand All @@ -48,7 +50,7 @@ public RedisCacheBackplane(ICacheManagerConfiguration configuration, ILoggerFact

_logger = loggerFactory.CreateLogger(this);
_channelName = configuration.BackplaneChannelName ?? "CacheManagerBackplane";
_identifier = Guid.NewGuid().ToString();
_identifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());

var cfg = RedisConfigurations.GetConfiguration(ConfigurationKey);
_connection = new RedisConnectionManager(
Expand Down Expand Up @@ -128,6 +130,7 @@ protected override void Dispose(bool managed)
{
try
{
_source.Cancel();
_connection.Subscriber.Unsubscribe(_channelName);
}
catch
Expand All @@ -138,15 +141,13 @@ protected override void Dispose(bool managed)
base.Dispose(managed);
}

private void Publish(string message)
private void Publish(byte[] message)
{
_connection.Subscriber.Publish(_channelName, message, CommandFlags.HighPriority);
}

private void PublishMessage(BackplaneMessage message)
{
var msg = message.Serialize();

lock (_messageLock)
{
if (message.Action == BackplaneAction.Clear)
Expand All @@ -155,12 +156,12 @@ private void PublishMessage(BackplaneMessage message)
_messages.Clear();
}

if (!_messages.Add(msg))
if (!_messages.Add(message))
{
Interlocked.Increment(ref _skippedMessages);
if (_logger.IsEnabled(LogLevel.Trace))
{
_logger.LogTrace("Skipped duplicate message: {0}.", msg);
_logger.LogTrace("Skipped duplicate message: {0}.", message);
}
}

Expand Down Expand Up @@ -188,12 +189,12 @@ private void SendMessages()
#if !NET40
await Task.Delay(10).ConfigureAwait(false);
#endif
var msgs = string.Empty;
byte[] msgs = null;
lock (_messageLock)
{
if (_messages != null && _messages.Count > 0)
{
msgs = string.Join(",", _messages);
msgs = BackplaneMessage.Serialize(_messages.ToArray());

if (_logger.IsEnabled(LogLevel.Debug))
{
Expand All @@ -207,7 +208,7 @@ private void SendMessages()

try
{
if (msgs.Length > 0)
if (msgs != null)
{
Publish(msgs);
Interlocked.Increment(ref SentChunks);
Expand All @@ -223,14 +224,14 @@ private void SendMessages()
#if NET40
},
this,
CancellationToken.None,
_source.Token,
TaskCreationOptions.None,
TaskScheduler.Default)
.ConfigureAwait(false);
#else
},
this,
CancellationToken.None,
_source.Token,
TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default)
.ConfigureAwait(false);
Expand All @@ -243,27 +244,25 @@ private void Subscribe()
_channelName,
(channel, msg) =>
{
var fullMessage = ((string)msg).Split(',')
.Where(s => !s.StartsWith(_identifier, StringComparison.Ordinal))
.ToArray();
var messages = BackplaneMessage.Deserialize(msg, _identifier);

if (fullMessage.Length == 0)
if (!messages.Any())
{
// no messages for this instance
return;
}

Interlocked.Add(ref MessagesReceived, fullMessage.Length);
// now deserialize all of them (lazy enumerable)
var fullMessages = messages.ToArray();
Interlocked.Add(ref MessagesReceived, fullMessages.Length);

if (_logger.IsEnabled(LogLevel.Information))
{
_logger.LogInfo("Backplane got notified with {0} new messages.", fullMessage.Length);
_logger.LogInfo("Backplane got notified with {0} new messages.", fullMessages.Length);
}

foreach (var messageStr in fullMessage)
foreach (var message in fullMessages)
{
var message = BackplaneMessage.Deserialize(messageStr);

switch (message.Action)
{
case BackplaneAction.Clear:
Expand Down
131 changes: 131 additions & 0 deletions test/CacheManager.Benchmarks/BackplaneMessageBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using BenchmarkDotNet.Attributes;
using CacheManager.Core.Internal;

namespace CacheManager.Benchmarks
{
public class BackplaneMessageBenchmarkMultiple
{
private static byte[] _ownderBytes = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
private static BackplaneMessage[] _multiple;
private byte[] _multipleSerialized = BackplaneMessage.Serialize(_multiple);

static BackplaneMessageBenchmarkMultiple()
{
var messages = new List<BackplaneMessage>();
for (var i = 0; i < 10; i++)
{
messages.Add(BackplaneMessage.ForChanged(_ownderBytes, "somerandomkey" + i, CacheItemChangedEventAction.Update));
messages.Add(BackplaneMessage.ForChanged(_ownderBytes, "somerandomkey" + i, "withregion", CacheItemChangedEventAction.Add));
}
for (var i = 0; i < 10; i++)
{
messages.Add(BackplaneMessage.ForClear(_ownderBytes));
}
for (var i = 0; i < 10; i++)
{
messages.Add(BackplaneMessage.ForClearRegion(_ownderBytes, "somerandomregion" + i));
}
for (var i = 0; i < 10; i++)
{
messages.Add(BackplaneMessage.ForRemoved(_ownderBytes, "somerandomkey" + i, "withregion"));
}
_multiple = messages.ToArray();
}

[Benchmark]
public void SerializeMany()
{
var bytes = BackplaneMessage.Serialize(_multiple);
}

[Benchmark()]
public void DeserializeMany()
{
var messages = BackplaneMessage.Deserialize(_multipleSerialized).ToArray();
}
}

public class BackplaneMessageBenchmark
{
private static byte[] _ownderBytes = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());

private static BackplaneMessage _dataSingleChange = BackplaneMessage.ForChanged(_ownderBytes, "somerandomkey", CacheItemChangedEventAction.Update);
private byte[] _rawSingleChange = BackplaneMessage.Serialize(_dataSingleChange);

private static BackplaneMessage _dataSingleChangeRegion = BackplaneMessage.ForChanged(_ownderBytes, "somerandomkey", "withregion", CacheItemChangedEventAction.Add);
private byte[] _rawSingleChangeRegion = BackplaneMessage.Serialize(_dataSingleChangeRegion);

private static BackplaneMessage _dataSingleClear = BackplaneMessage.ForClear(_ownderBytes);
private byte[] _rawSingleClear = BackplaneMessage.Serialize(_dataSingleClear);

private static BackplaneMessage _dataSingleClearRegion = BackplaneMessage.ForClearRegion(_ownderBytes, "somerandomregion");
private byte[] _rawSingleClearRegion = BackplaneMessage.Serialize(_dataSingleClearRegion);

private static BackplaneMessage _dataSingleRemove = BackplaneMessage.ForRemoved(_ownderBytes, "somerandomkey", "withregion");
private byte[] _rawSingleRemove = BackplaneMessage.Serialize(_dataSingleRemove);

[Benchmark(Baseline = true)]
public void SerializeChange()
{
var fullMessage = BackplaneMessage.Serialize(_dataSingleChange);
}

[Benchmark()]
public void DeserializeChange()
{
var msg = BackplaneMessage.Deserialize(_rawSingleChange).ToArray();
}

[Benchmark]
public void SerializeChangeRegion()
{
var fullMessage = BackplaneMessage.Serialize(_dataSingleChangeRegion);
}

[Benchmark()]
public void DeserializeChangeRegion()
{
var msg = BackplaneMessage.Deserialize(_rawSingleChangeRegion).ToArray();
}

[Benchmark]
public void SerializeClear()
{
var fullMessage = BackplaneMessage.Serialize(_dataSingleClear);
}

[Benchmark()]
public void DeserializeClear()
{
var msg = BackplaneMessage.Deserialize(_rawSingleClear).ToArray();
}

[Benchmark]
public void SerializeClearRegion()
{
var fullMessage = BackplaneMessage.Serialize(_dataSingleClearRegion);
}

[Benchmark()]
public void DeserializeClearRegion()
{
var msg = BackplaneMessage.Deserialize(_rawSingleClearRegion).ToArray();
}

[Benchmark]
public void SerializeRemove()
{
var fullMessage = BackplaneMessage.Serialize(_dataSingleRemove);
}

[Benchmark()]
public void DeserializeRemove()
{
var msg = BackplaneMessage.Deserialize(_rawSingleRemove).ToArray();
}
}
}
1 change: 0 additions & 1 deletion test/CacheManager.Benchmarks/BaseCacheManagerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

namespace CacheManager.Benchmarks
{
[Config(typeof(CacheManagerBenchConfig))]
public abstract class BaseCacheBenchmark
{
private static ICacheManagerConfiguration BaseConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
``` ini

BenchmarkDotNet=v0.10.8, OS=Windows 10 Redstone 2 (10.0.15063)
Processor=Intel Core i7-6700 CPU 3.40GHz (Skylake), ProcessorCount=8
Frequency=3328125 Hz, Resolution=300.4695 ns, Timer=TSC
[Host] : Clr 4.0.30319.42000, 64bit RyuJIT-v4.7.2098.0
Job-VUBPRE : Clr 4.0.30319.42000, 64bit RyuJIT-v4.7.2098.0

Runtime=Clr LaunchCount=1 TargetCount=10
WarmupCount=4

```
| Method | Mean | Error | StdDev | Median | Op/s | Scaled | ScaledSD | Rank | Gen 0 | Allocated |
|------------------------ |----------:|----------:|---------:|----------:|-------------:|-------:|---------:|-----:|-------:|----------:|
| SerializeChange | 194.57 ns | 11.309 ns | 7.480 ns | 192.86 ns | 5,139,629.5 | 1.00 | 0.00 | 4 | 0.1523 | 640 B |
| DeserializeChange | 241.69 ns | 10.354 ns | 6.848 ns | 238.51 ns | 4,137,485.5 | 1.24 | 0.06 | 5 | 0.0875 | 368 B |
| SerializeChangeRegion | 249.23 ns | 7.700 ns | 4.582 ns | 249.67 ns | 4,012,334.1 | 1.28 | 0.05 | 6 | 0.1826 | 768 B |
| DeserializeChangeRegion | 305.05 ns | 8.953 ns | 5.922 ns | 306.25 ns | 3,278,156.0 | 1.57 | 0.06 | 7 | 0.0987 | 416 B |
| SerializeClear | 96.21 ns | 4.459 ns | 2.949 ns | 95.74 ns | 10,393,877.9 | 0.50 | 0.02 | 1 | 0.1162 | 488 B |
| DeserializeClear | 158.92 ns | 7.949 ns | 5.258 ns | 159.25 ns | 6,292,374.9 | 0.82 | 0.04 | 2 | 0.0741 | 312 B |
| SerializeClearRegion | 169.66 ns | 7.262 ns | 3.798 ns | 169.60 ns | 5,894,312.1 | 0.87 | 0.04 | 3 | 0.1581 | 664 B |
| DeserializeClearRegion | 240.22 ns | 8.661 ns | 5.154 ns | 240.20 ns | 4,162,849.7 | 1.24 | 0.05 | 5 | 0.0892 | 376 B |
| SerializeRemove | 255.63 ns | 15.071 ns | 9.968 ns | 258.50 ns | 3,911,924.7 | 1.32 | 0.07 | 6 | 0.1826 | 768 B |
| DeserializeRemove | 300.98 ns | 9.420 ns | 5.606 ns | 299.66 ns | 3,322,483.1 | 1.55 | 0.06 | 7 | 0.0987 | 416 B |
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Method;Job;AnalyzeLaunchVariance;EvaluateOverhead;MaxAbsoluteError;MaxRelativeError;MinInvokeCount;MinIterationTime;RemoveOutliers;Affinity;Jit;Platform;Runtime;AllowVeryLargeObjects;Concurrent;CpuGroups;Force;RetainVm;Server;Clock;EngineFactory;Toolchain;InvocationCount;IterationTime;LaunchCount;RunStrategy;TargetCount;UnrollFactor;WarmupCount;Mean;Error;StdDev;Median;Op/s;Scaled;ScaledSD;Rank;Gen 0;Allocated
SerializeChange;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;194.57 ns;11.309 ns;7.480 ns;192.86 ns;"5,139,629.5";1.00;0.00;4;0.1523;640 B
DeserializeChange;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;241.69 ns;10.354 ns;6.848 ns;238.51 ns;"4,137,485.5";1.24;0.06;5;0.0875;368 B
SerializeChangeRegion;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;249.23 ns;7.700 ns;4.582 ns;249.67 ns;"4,012,334.1";1.28;0.05;6;0.1826;768 B
DeserializeChangeRegion;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;305.05 ns;8.953 ns;5.922 ns;306.25 ns;"3,278,156.0";1.57;0.06;7;0.0987;416 B
SerializeClear;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;96.21 ns;4.459 ns;2.949 ns;95.74 ns;"10,393,877.9";0.50;0.02;1;0.1162;488 B
DeserializeClear;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;158.92 ns;7.949 ns;5.258 ns;159.25 ns;"6,292,374.9";0.82;0.04;2;0.0741;312 B
SerializeClearRegion;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;169.66 ns;7.262 ns;3.798 ns;169.60 ns;"5,894,312.1";0.87;0.04;3;0.1581;664 B
DeserializeClearRegion;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;240.22 ns;8.661 ns;5.154 ns;240.20 ns;"4,162,849.7";1.24;0.05;5;0.0892;376 B
SerializeRemove;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;255.63 ns;15.071 ns;9.968 ns;258.50 ns;"3,911,924.7";1.32;0.07;6;0.1826;768 B
DeserializeRemove;Default;False;Default;Default;Default;Default;Default;Default;255;RyuJit;X64;Clr;False;True;False;True;False;False;Default;Default;Default;1;Default;1;Default;10;16;4;300.98 ns;9.420 ns;5.606 ns;299.66 ns;"3,322,483.1";1.55;0.06;7;0.0987;416 B
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<!DOCTYPE html>
<html lang='en'>
<head>
<meta charset='utf-8' />
<title>BackplaneMessageBenchmark</title>

<style type="text/css">
table { border-collapse: collapse; display: block; width: 100%; overflow: auto; }
td, th { padding: 6px 13px; border: 1px solid #ddd; }
tr { background-color: #fff; border-top: 1px solid #ccc; }
tr:nth-child(even) { background: #f8f8f8; }
</style>
</head>
<body>
<pre><code>
BenchmarkDotNet=v0.10.8, OS=Windows 10 Redstone 2 (10.0.15063)
Processor=Intel Core i7-6700 CPU 3.40GHz (Skylake), ProcessorCount=8
Frequency=3328125 Hz, Resolution=300.4695 ns, Timer=TSC
[Host] : Clr 4.0.30319.42000, 64bit RyuJIT-v4.7.2098.0
Job-VUBPRE : Clr 4.0.30319.42000, 64bit RyuJIT-v4.7.2098.0
</code></pre>
<pre><code>Runtime=Clr LaunchCount=1 TargetCount=10
WarmupCount=4
</code></pre>

<table>
<thead><tr><th> Method</th><th>Mean</th><th>Error</th><th>StdDev</th><th>Median</th><th> Op/s</th><th>Scaled</th><th>ScaledSD</th><th>Rank</th><th>Gen 0</th><th>Allocated</th>
</tr>
</thead><tbody><tr><td> SerializeChange</td><td>194.57 ns</td><td>11.309 ns</td><td>7.480 ns</td><td>192.86 ns</td><td>5,139,629.5</td><td>1.00</td><td>0.00</td><td>4</td><td>0.1523</td><td>640 B</td>
</tr><tr><td>DeserializeChange</td><td>241.69 ns</td><td>10.354 ns</td><td>6.848 ns</td><td>238.51 ns</td><td>4,137,485.5</td><td>1.24</td><td>0.06</td><td>5</td><td>0.0875</td><td>368 B</td>
</tr><tr><td>SerializeChangeRegion</td><td>249.23 ns</td><td>7.700 ns</td><td>4.582 ns</td><td>249.67 ns</td><td>4,012,334.1</td><td>1.28</td><td>0.05</td><td>6</td><td>0.1826</td><td>768 B</td>
</tr><tr><td>DeserializeChangeRegion</td><td>305.05 ns</td><td>8.953 ns</td><td>5.922 ns</td><td>306.25 ns</td><td>3,278,156.0</td><td>1.57</td><td>0.06</td><td>7</td><td>0.0987</td><td>416 B</td>
</tr><tr><td> SerializeClear</td><td>96.21 ns</td><td>4.459 ns</td><td>2.949 ns</td><td>95.74 ns</td><td>10,393,877.9</td><td>0.50</td><td>0.02</td><td>1</td><td>0.1162</td><td>488 B</td>
</tr><tr><td> DeserializeClear</td><td>158.92 ns</td><td>7.949 ns</td><td>5.258 ns</td><td>159.25 ns</td><td>6,292,374.9</td><td>0.82</td><td>0.04</td><td>2</td><td>0.0741</td><td>312 B</td>
</tr><tr><td>SerializeClearRegion</td><td>169.66 ns</td><td>7.262 ns</td><td>3.798 ns</td><td>169.60 ns</td><td>5,894,312.1</td><td>0.87</td><td>0.04</td><td>3</td><td>0.1581</td><td>664 B</td>
</tr><tr><td>DeserializeClearRegion</td><td>240.22 ns</td><td>8.661 ns</td><td>5.154 ns</td><td>240.20 ns</td><td>4,162,849.7</td><td>1.24</td><td>0.05</td><td>5</td><td>0.0892</td><td>376 B</td>
</tr><tr><td> SerializeRemove</td><td>255.63 ns</td><td>15.071 ns</td><td>9.968 ns</td><td>258.50 ns</td><td>3,911,924.7</td><td>1.32</td><td>0.07</td><td>6</td><td>0.1826</td><td>768 B</td>
</tr><tr><td>DeserializeRemove</td><td>300.98 ns</td><td>9.420 ns</td><td>5.606 ns</td><td>299.66 ns</td><td>3,322,483.1</td><td>1.55</td><td>0.06</td><td>7</td><td>0.0987</td><td>416 B</td>
</tr></tbody></table>
</body>
</html>
Loading

0 comments on commit f4846d9

Please sign in to comment.