Skip to content

Commit

Permalink
Feature: OfType Operator (#865)
Browse files Browse the repository at this point in the history
A new operator `OfType` that is similar to the LINQ operator with the same name.  It converts a changeset of some base type (or interface) to a changeset of a different type by automatically excluding values that can't be cast as the new type.  It effectively combines `ImmutableFilter` and `ImmutableTransform` into a single operator for maximum efficiency for the case when both operations are trivial.
  • Loading branch information
dwcullop authored Feb 26, 2024
1 parent d3933e3 commit 0e5497a
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,10 @@ namespace DynamicData
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> NotEmpty<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> OfType<TObject, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, bool suppressEmptyChangeSets = true)
where TObject : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemAdded<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject> addAction)
where TObject : notnull
where TKey : notnull { }
Expand Down
157 changes: 157 additions & 0 deletions src/DynamicData.Tests/Cache/OfTypeFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Bogus;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace DynamicData.Tests.Cache;

public class OfTypeFixture : IDisposable
{
#if DEBUG
const int AddCount = 7;
const int UpdateCount = 5;
const int RemoveCount = 3;
#else
const int AddCount = 101;
const int UpdateCount = 57;
const int RemoveCount = 53;
#endif

private readonly Randomizer _randomizer;

private readonly Faker<Person> _personFaker;

private readonly Faker<CatPerson> _catPersonFaker;

private readonly SourceCache<Person, string> _sourceCache = new(p => p.Id);

private readonly ChangeSetAggregator<Person, string> _personResults;

private readonly ChangeSetAggregator<CatPerson, string> _catPersonResults;

public OfTypeFixture(ITestOutputHelper testOutputHelper)
{
_randomizer = new(0x3737_ddcc);
_personFaker = new Faker<Person>().CustomInstantiator(faker => new Person(faker.Person.FullName)).WithSeed(_randomizer);
_catPersonFaker = new Faker<CatPerson>().CustomInstantiator(faker => new CatPerson(faker.Person.FullName, $"{faker.Hacker.Adjective()} the {faker.Hacker.Noun()}")).WithSeed(_randomizer);
_personResults = _sourceCache.Connect().TestSpy(testOutputHelper, "Cache").AsAggregator();
_catPersonResults = _sourceCache.Connect().OfType<Person, string, CatPerson>().TestSpy(testOutputHelper, "OfType").AsAggregator();
}

[Fact]
public void AddedItemsAreInResults()
{
// Arrange
var people = _personFaker.Generate(AddCount);
var catPeople = _catPersonFaker.Generate(AddCount);

_sourceCache.AddOrUpdate(people);
_sourceCache.AddOrUpdate(catPeople);

_personResults.Summary.Overall.Adds.Should().Be(AddCount * 2);
_personResults.Messages.Count.Should().Be(2);
_catPersonResults.Summary.Overall.Adds.Should().Be(AddCount);
_catPersonResults.Messages.Count.Should().Be(1);
CheckResults();
}

[Fact]
public void RemovedItemsAreNotResults()
{
var people = _personFaker.Generate(AddCount);
var catPeople = _catPersonFaker.Generate(AddCount);

_sourceCache.AddOrUpdate(people);
_sourceCache.AddOrUpdate(catPeople);
_sourceCache.Remove(_randomizer.ListItems(people, RemoveCount));
_sourceCache.Remove(_randomizer.ListItems(catPeople, RemoveCount));

_personResults.Summary.Overall.Adds.Should().Be(AddCount * 2);
_personResults.Summary.Overall.Removes.Should().Be(RemoveCount * 2);
_personResults.Messages.Count.Should().Be(4);
_catPersonResults.Summary.Overall.Adds.Should().Be(AddCount);
_catPersonResults.Summary.Overall.Removes.Should().Be(RemoveCount);
_catPersonResults.Messages.Count.Should().Be(2);
CheckResults();
}

[Fact]
public void UpdateResultsAreCorrect()
{
// Arrange
var people = _personFaker.Generate(AddCount);
var catPeople = _catPersonFaker.Generate(AddCount);

_sourceCache.AddOrUpdate(people);
_sourceCache.AddOrUpdate(catPeople);

var updates = _randomizer.ListItems(people.Concat(catPeople).ToList(), UpdateCount);
var preUpdateCatPeople = updates.Where(p => p is CatPerson).ToList();
var updated = updates.Select(p => GenerateUpdateRandom(p.Id)).ToList();
var postUpdateCatPeople = updated.Where(p => p is CatPerson).ToList();
var catToCatCount = preUpdateCatPeople.Count(p => postUpdateCatPeople.Any(pu => pu.Id == p.Id));
var catToNonCount = preUpdateCatPeople.Count - catToCatCount;
var nonToCatCount = postUpdateCatPeople.Count(p => !preUpdateCatPeople.Any(pu => pu.Id == p.Id));

// Act
_sourceCache.AddOrUpdate(updated);

// Assert
_personResults.Summary.Overall.Adds.Should().Be(AddCount * 2);
_personResults.Summary.Overall.Updates.Should().Be(UpdateCount);
_personResults.Messages.Count.Should().Be(3);
_catPersonResults.Summary.Overall.Adds.Should().Be(AddCount + nonToCatCount);
_catPersonResults.Summary.Overall.Removes.Should().Be(catToNonCount);
_catPersonResults.Summary.Overall.Updates.Should().Be(catToCatCount);
_catPersonResults.Messages.Count.Should().Be(2);
CheckResults();
}

public void Dispose()
{
_sourceCache.Dispose();
_personResults.Dispose();
_catPersonResults.Dispose();
}

private IEnumerable<Person> GeneratePeople(int count = AddCount) => Enumerable.Range(0, count).Select(_ => _randomizer.Bool() ? _personFaker.Generate() : _catPersonFaker.Generate());

private Person GenerateUpdateRandom(string id) => _randomizer.Bool() ? GenerateUpdatePerson(id) : GenerateUpdateCatPerson(id);

private Person GenerateUpdatePerson(string id) => new(_personFaker.Generate().Name, id);

private CatPerson GenerateUpdateCatPerson(string id)
{
var newCp = _catPersonFaker.Generate();
return new CatPerson(newCp.Name, newCp.CatName, id);
}

private void CheckResults()
{
var expectedPeople = _sourceCache.Items;
var expectedCatPeople = expectedPeople.OfType<CatPerson>();

_personResults.Data.Items.Should().BeEquivalentTo(expectedPeople);
_catPersonResults.Data.Items.Should().BeEquivalentTo(expectedCatPeople);
}

private interface ICatPerson
{
string CatName { get; }
}

private record Person(string Name, string Id)
{
public Person(string Name) : this(Name, Guid.NewGuid().ToString("N")) { }
}

private record CatPerson(string Name, string CatName, string Id) : Person(Name, Id), ICatPerson
{
public CatPerson(string Name, string CatName) : this(Name, CatName, Guid.NewGuid().ToString("N")) { }
}
}
87 changes: 48 additions & 39 deletions src/DynamicData.Tests/Utilities/ObservableSpy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Reactive.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using Xunit.Abstractions;

namespace DynamicData.Tests.Utilities;

Expand Down Expand Up @@ -31,30 +32,27 @@ private static class NativeMethods
/// <param name="showTimestamps">Indicates whether or not timestamps should be prepended to messages.</param>
/// <returns>An IObservable{T} with the Spy events included.</returns>
/// <remarks>Adapted from https://stackoverflow.com/q/20220755/.</remarks>
public static IObservable<T> Spy<T>(this IObservable<T> source, string? infoText = null, Action<string>? logger = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
public static IObservable<T> Spy<T>(this IObservable<T> source, string? infoText = null, Action<string>? logger = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
{
static string NoTimestamp() => string.Empty;
static string HighResTimestamp() => DateTimeOffset.UtcNow.ToString("HH:mm:ss.fffffff") + " ";
static void NullLogger(string _) { }

var activeSubscriptionCounter = 0;
var subscriptionCounter = 0;

formatter ??= (t => t?.ToString() ?? "{Null}");
logger = CreateLogger(logger ?? Console.WriteLine, showTimestamps ? HighResTimestamp : NoTimestamp, infoText ?? $"IObservable<{typeof(T).Name}>");

var subLogger = showSubs ? logger : NullLogger;
var subLogger = showSubs ? logger : null;

logger("Creating Observable");
return Observable.Create<T>(obs =>
return Observable.Create<T>(observer =>
{
var subId = Interlocked.Increment(ref subscriptionCounter);
var valueCounter = 0;
bool? completedSuccessfully = null;
subLogger($"Creating Subscription #{subId}");
subLogger?.Invoke($"Creating Subscription #{subId}");
try
{
var subscription = source
Expand All @@ -65,79 +63,83 @@ static void NullLogger(string _) { }
{
try
{
obs.OnNext(t);
observer.OnNext(t);
}
catch (Exception ex)
{
logger($"Downstream Exception [SubId:{subId}] ({ex})");
throw;
}
}, obs.OnError, obs.OnCompleted);
}, observer.OnError, observer.OnCompleted);
return Disposable.Create(() =>
{
if (showSubs)
if (subLogger != null)
{
switch (completedSuccessfully)
{
case true: subLogger($"Disposing SubId #{subId} due to OnComplete"); break;
case false: subLogger($"Disposing SubId #{subId} due to OnError"); break;
case null: subLogger($"Disposing SubId #{subId} due to Unsubscribe"); break;
case true: subLogger.Invoke($"Disposing SubId #{subId} due to OnComplete"); break;
case false: subLogger.Invoke($"Disposing SubId #{subId} due to OnError"); break;
case null: subLogger.Invoke($"Disposing SubId #{subId} due to Unsubscribe"); break;
}
}
subscription?.Dispose();
var count = Interlocked.Decrement(ref activeSubscriptionCounter);
subLogger($"Dispose Completed! ({count} Active Subscriptions)");
subLogger?.Invoke($"Dispose Completed! ({count} Active Subscriptions)");
});
}
finally
{
var count = Interlocked.Increment(ref activeSubscriptionCounter);
subLogger($"Subscription Id #{subId} Created! ({count} Active Subscriptions)");
subLogger?.Invoke($"Subscription Id #{subId} Created! ({count} Active Subscriptions)");
}
});
}

public static IObservable<IChangeSet<T, TKey>> Spy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source,
string? opName = null, Action<string>? logger = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
public static IObservable<IChangeSet<T, TKey>> Spy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source, string? opName = null, Action<string>? logger = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull
where TKey : notnull
{
formatter ??= (t => t?.ToString() ?? "{Null}");
return Spy(source, opName, logger, CreateCacheChangeSetFormatter<T, TKey>(formatter!), showSubs, showTimestamps);
}

public static IObservable<IChangeSet<T>> Spy<T>(this IObservable<IChangeSet<T>> source,
string? opName = null, Action<string>? logger = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
where T : notnull
public static IObservable<IChangeSet<T>> Spy<T>(this IObservable<IChangeSet<T>> source, string? opName = null, Action<string>? logger = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull
{
formatter ??= (t => t?.ToString() ?? "{Null}");
return Spy(source, opName, logger, CreateListChangeSetFormatter(formatter!), showSubs, showTimestamps);
}

private static Func<IChangeSet<T, TKey>, string> CreateCacheChangeSetFormatter<T, TKey>(Func<T, string> formatter) where T : notnull where TKey : notnull =>
private static Func<IChangeSet<T, TKey>, string> CreateCacheChangeSetFormatter<T, TKey>(Func<T, string> formatter)
where T : notnull
where TKey : notnull =>
cs => "[Cache Change Set]" + ChangeSetEntrySpacing + string.Join(ChangeSetEntrySpacing, cs.Select((change, n) => $"#{n} {FormatChange(formatter, change)}"));

private static Func<IChangeSet<T>, string> CreateListChangeSetFormatter<T>(Func<T, string> formatter) where T : notnull =>
private static Func<IChangeSet<T>, string> CreateListChangeSetFormatter<T>(Func<T, string> formatter)
where T : notnull =>
cs => "[List Change Set]" + ChangeSetEntrySpacing + string.Join(ChangeSetEntrySpacing, cs.Select((change, n) => $"#{n} {FormatChange(formatter, change)}"));

public static IObservable<T> DebugSpy<T>(this IObservable<T> source, string? opName = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true) =>
#if DEBUG || DEBUG_SPY_ALWAYS
public static IObservable<T> TestSpy<T>(this IObservable<T> source, ITestOutputHelper testOutputHelper, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true) =>
source.Spy(opName, TestLogger(testOutputHelper), formatter, showSubs, showTimestamps);

public static IObservable<IChangeSet<T, TKey>> TestSpy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source, ITestOutputHelper testOutputHelper, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull
where TKey : notnull =>
source.Spy(opName, TestLogger(testOutputHelper), formatter, showSubs, showTimestamps);

public static IObservable<IChangeSet<T>> TestSpy<T>(this IObservable<IChangeSet<T>> source, ITestOutputHelper testOutputHelper, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull =>
source.Spy(opName, TestLogger(testOutputHelper), formatter, showSubs, showTimestamps);

public static IObservable<T> DebugSpy<T>(this IObservable<T> source, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true) =>
#if DEBUG || DEBUG_SPY_ALWAYS
source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps);
#else
source;
#endif

public static IObservable<IChangeSet<T, TKey>> DebugSpy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source,
string? opName = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
public static IObservable<IChangeSet<T, TKey>> DebugSpy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull
where TKey : notnull =>
#if DEBUG || DEBUG_SPY_ALWAYS
Expand All @@ -146,11 +148,8 @@ public static IObservable<IChangeSet<T, TKey>> DebugSpy<T, TKey>(this IObservabl
source;
#endif

public static IObservable<IChangeSet<T>> DebugSpy<T>(this IObservable<IChangeSet<T>> source,
string? opName = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
where T : notnull =>
public static IObservable<IChangeSet<T>> DebugSpy<T>(this IObservable<IChangeSet<T>> source, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull =>
#if DEBUG || DEBUG_SPY_ALWAYS
source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps);
#else
Expand Down Expand Up @@ -199,6 +198,16 @@ private static string FormatItemChange<T>(Func<T, string> formatter, ItemChange<
private static Action<string> CreateLogger(Action<string> baseLogger, Func<string> timeStamper, string opName) =>
msg => baseLogger($"{timeStamper()}[{Environment.CurrentManagedThreadId:X2}] |{opName}| {msg}");

#if DEBUG || DEBUG_SPY_ALWAYS
private static Action<string> TestLogger(ITestOutputHelper testOutputHelper) => str =>
{
testOutputHelper.WriteLine(str);
DebugLogger(str);
};
#else
private static Action<string> TestLogger(ITestOutputHelper testOutputHelper) => testOutputHelper.WriteLine;
#endif

#if DEBUG
private static void DebugLogger(string str) => System.Diagnostics.Debug.WriteLine(str);
#elif DEBUG_SPY_ALWAYS
Expand Down
Loading

0 comments on commit 0e5497a

Please sign in to comment.