Skip to content

Commit

Permalink
Fixing DisjointUnion punctuations (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterfreiling authored Dec 4, 2019
1 parent ada8d3a commit 6cacadc
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,124 @@
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using Microsoft.StreamProcessing.Internal.Collections;

namespace Microsoft.StreamProcessing
{
[DataContract]
internal sealed class DisjointUnionPipe<TKey, TPayload> : BinaryPipe<TKey, TPayload, TPayload, TPayload>
{
private static readonly long GlobalPunctuationOtherTime = typeof(TKey).GetPartitionType() != null ? PartitionedStreamEvent.LowWatermarkOtherTime : StreamEvent.PunctuationOtherTime;
private readonly MemoryPool<TKey, TPayload> pool;

[DataMember]
private long leftGlobalPunctuation = long.MinValue;
[DataMember]
private long rightGlobalPunctuation = long.MinValue;

[Obsolete("Used only by serialization. Do not call directly.")]
public DisjointUnionPipe() { }

public DisjointUnionPipe(Streamable<TKey, TPayload> stream, IStreamObserver<TKey, TPayload> observer)
: base(stream, observer)
{
this.pool = MemoryManager.GetMemoryPool<TKey, TPayload>(stream.Properties.IsColumnar);
}

protected override void ProcessBothBatches(StreamMessage<TKey, TPayload> leftBatch, StreamMessage<TKey, TPayload> rightBatch, out bool leftBatchDone, out bool rightBatchDone, out bool leftBatchFree, out bool rightBatchFree)
{
ProcessLeftBatch(leftBatch, out leftBatchDone, out leftBatchFree);
ProcessRightBatch(rightBatch, out rightBatchDone, out rightBatchFree);
leftBatchDone = true;
leftBatchFree = false;
rightBatchDone = true;
rightBatchFree = false;

var newLeftGlobalPunctuation = Math.Max(this.leftGlobalPunctuation, ExtractGlobalPunctuations(leftBatch));
var newRightGlobalPunctuation = Math.Max(this.rightGlobalPunctuation, ExtractGlobalPunctuations(rightBatch));
this.Observer.OnNext(leftBatch);
this.Observer.OnNext(rightBatch);

var newGlobalPunctuation = Math.Min(newLeftGlobalPunctuation, newRightGlobalPunctuation);
EmitGlobalPunctuationIfNecessary(newGlobalPunctuation);

this.leftGlobalPunctuation = newLeftGlobalPunctuation;
this.rightGlobalPunctuation = newRightGlobalPunctuation;
}

protected override void ProcessLeftBatch(StreamMessage<TKey, TPayload> leftBatch, out bool leftBatchDone, out bool leftBatchFree)
{
leftBatchDone = true;
leftBatchFree = false;

var newLeftGlobalPunctuation = Math.Max(this.leftGlobalPunctuation, ExtractGlobalPunctuations(leftBatch));
this.Observer.OnNext(leftBatch);

var newGlobalPunctuation = Math.Min(newLeftGlobalPunctuation, this.rightGlobalPunctuation);
EmitGlobalPunctuationIfNecessary(newGlobalPunctuation);

this.leftGlobalPunctuation = newLeftGlobalPunctuation;
}

protected override void ProcessRightBatch(StreamMessage<TKey, TPayload> rightBatch, out bool rightBatchDone, out bool rightBatchFree)
{
rightBatchDone = true;
rightBatchFree = false;

var newRightGlobalPunctuation = Math.Max(this.rightGlobalPunctuation, ExtractGlobalPunctuations(rightBatch));
this.Observer.OnNext(rightBatch);

var newGlobalPunctuation = Math.Min(this.leftGlobalPunctuation, newRightGlobalPunctuation);
EmitGlobalPunctuationIfNecessary(newGlobalPunctuation);

this.rightGlobalPunctuation = newRightGlobalPunctuation;
}

protected override void ProduceBinaryQueryPlan(PlanNode left, PlanNode right)
{
var node = new UnionPlanNode(
left, right, this, typeof(TKey), typeof(TPayload), true, false, null);
this.Observer.ProduceQueryPlan(node);
}
=> this.Observer.ProduceQueryPlan(new UnionPlanNode(left, right, this, typeof(TKey), typeof(TPayload), true, false, null));

public override bool LeftInputHasState => false;
public override bool RightInputHasState => false;
public override int CurrentlyBufferedOutputCount => 0;

private void EmitGlobalPunctuationIfNecessary(long time)
{
if (time > Math.Min(this.leftGlobalPunctuation, this.rightGlobalPunctuation))
{
this.pool.Get(out StreamMessage<TKey, TPayload> batch);
batch.Allocate();
batch.Add(vsync: time, vother: GlobalPunctuationOtherTime, key: default, payload: default);
this.Observer.OnNext(batch);
}
}

/// <summary>
/// Removes all global punctuations from the batch, and returns the maximum found
/// </summary>
private long ExtractGlobalPunctuations(StreamMessage<TKey, TPayload> batch)
{
var count = batch.Count;
long max = -1;
bool writable = false;
for (int i = 0; i < count; i++)
{
if (batch.vother.col[i] == GlobalPunctuationOtherTime)
{
max = Math.Max(max, batch.vsync.col[i]);

// Remove the low watermark/punctuation by converting to a deleted data event
if (!writable)
{
batch.vother = batch.vother.MakeWritable(this.pool.longPool);
batch.bitvector = batch.bitvector.MakeWritable(this.pool.bitvectorPool);
writable = true;
}

batch.vother.col[i] = 0;
batch.bitvector.col[i >> 6] |= (1L << (i & 0x3f));
}
}

return max;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,22 @@ private void OutputCurrentTuple(StreamMessage<TKey, TPayload> batch)
private void OutputBatch(StreamMessage<TKey, TPayload> batch)
{
long updatedCTI = this.lastCTI;
bool writable = false;
for (int i = 0; i < batch.Count; i++)
{
// Find first punctuation
// Since we are emitting the entire batch from one side of the union, we need to ensure that all
// punctuations are ordered with respect to the other side of the union
if (batch.vother.col[i] == StreamEvent.PunctuationOtherTime)
{
if (batch.vsync.col[i] <= updatedCTI)
{
if (!writable)
{
batch.vother = batch.vother.MakeWritable(this.pool.longPool);
batch.bitvector = batch.bitvector.MakeWritable(this.pool.bitvectorPool);
writable = true;
}

// Remove the redundant punctuation by converting to a deleted data event
batch.vother.col[i] = 0;
batch.bitvector.col[i >> 6] |= (1L << (i & 0x3f));
Expand Down
1 change: 1 addition & 0 deletions Sources/Test/SimpleTesting/SimpleTesting.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
<DependentUpon>SnapshotTests.tt</DependentUpon>
</Compile>
<Compile Include="Streamables\StitchTest.cs" />
<Compile Include="Streamables\DisjointUnionTest.cs" />
<Compile Include="Streamables\WhereTest.cs" />
<Compile Include="Serializer\SurrogateTests.cs" />
</ItemGroup>
Expand Down
106 changes: 106 additions & 0 deletions Sources/Test/SimpleTesting/Streamables/DisjointUnionTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Microsoft.StreamProcessing;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace SimpleTesting
{
[TestClass]
public class DisjointUnionTest : TestWithConfigSettingsAndMemoryLeakDetection
{
[TestMethod, TestCategory("Gated")]
public void DisjointUnionPunctuations()
{
var left = new Subject<StreamEvent<int>>();
var right = new Subject<StreamEvent<int>>();

var qc = new QueryContainer();
var leftInput = qc.RegisterInput(left);
var rightInput = qc.RegisterInput(right);

var actualOutput = new List<StreamEvent<int>>();
var union = new MultiUnionStreamable<Empty, int>(new IStreamable<Empty, int>[] { leftInput, rightInput }, guaranteedDisjoint: true);
var egress = qc.RegisterOutput(union).ForEachAsync(o => actualOutput.Add(o));
var process = qc.Restore();

left.OnNext(StreamEvent.CreatePoint(100, 1));
left.OnNext(StreamEvent.CreatePunctuation<int>(101));

right.OnNext(StreamEvent.CreatePoint(100, 1));
right.OnNext(StreamEvent.CreatePunctuation<int>(110));

process.Flush();

left.OnNext(StreamEvent.CreatePoint(101, 1));
right.OnNext(StreamEvent.CreatePoint(110, 1));

process.Flush();

left.OnCompleted();
right.OnCompleted();

var expected = new StreamEvent<int>[]
{
StreamEvent.CreatePoint(100, 1),
StreamEvent.CreatePoint(100, 1),
StreamEvent.CreatePunctuation<int>(101),
StreamEvent.CreatePoint(101, 1),
StreamEvent.CreatePoint(110, 1),
StreamEvent.CreatePunctuation<int>(110),
StreamEvent.CreatePunctuation<int>(StreamEvent.InfinitySyncTime),
};

Assert.IsTrue(expected.SequenceEqual(actualOutput));
}

[TestMethod, TestCategory("Gated")]
public void DisjointUnionLowWatermarks()
{
const int leftKey = 1;
const int rightKey = 2;
var left = new Subject<PartitionedStreamEvent<int, int>>();
var right = new Subject<PartitionedStreamEvent<int, int>>();

var qc = new QueryContainer();
var leftInput = qc.RegisterInput(left);
var rightInput = qc.RegisterInput(right);

var actualOutput = new List<PartitionedStreamEvent<int, int>>();
var inputs = new IStreamable<PartitionKey<int>, int>[] { leftInput, rightInput };
var union = new MultiUnionStreamable<PartitionKey<int>, int>(inputs, guaranteedDisjoint: true);
var egress = qc.RegisterOutput(union).ForEachAsync(o => actualOutput.Add(o));
var process = qc.Restore();

left.OnNext(PartitionedStreamEvent.CreatePoint(leftKey, 100, 1));
left.OnNext(PartitionedStreamEvent.CreateLowWatermark<int, int>(101));

right.OnNext(PartitionedStreamEvent.CreatePoint(rightKey, 100, 1));
right.OnNext(PartitionedStreamEvent.CreateLowWatermark<int, int>(110));

process.Flush();

left.OnNext(PartitionedStreamEvent.CreatePoint(leftKey, 101, 1));
right.OnNext(PartitionedStreamEvent.CreatePoint(rightKey, 110, 1));

process.Flush();

left.OnCompleted();
right.OnCompleted();

var expected = new PartitionedStreamEvent<int, int>[]
{
PartitionedStreamEvent.CreatePoint(leftKey, 100, 1),
PartitionedStreamEvent.CreatePoint(rightKey, 100, 1),
PartitionedStreamEvent.CreateLowWatermark<int, int>(101),
PartitionedStreamEvent.CreatePoint(leftKey, 101, 1),
PartitionedStreamEvent.CreatePoint(rightKey, 110, 1),
PartitionedStreamEvent.CreateLowWatermark<int, int>(110),
PartitionedStreamEvent.CreateLowWatermark<int, int>(StreamEvent.InfinitySyncTime),
};

Assert.IsTrue(expected.SequenceEqual(actualOutput));
}
}
}
2 changes: 1 addition & 1 deletion Sources/Test/SimpleTesting/Streamables/WhereTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Microsoft.StreamProcessing;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace SimpleTesting.NewTest
namespace SimpleTesting
{
[TestClass]
public class WhereTest : TestWithConfigSettingsAndMemoryLeakDetection
Expand Down

0 comments on commit 6cacadc

Please sign in to comment.