Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure multicast subject preserves source type #1916

Merged
merged 5 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 74 additions & 14 deletions Bonsai.Core.Tests/SubjectTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Bonsai.Expressions;
using Bonsai.Reactive;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand All @@ -18,30 +21,87 @@ public override IObservable<T> Process(IObservable<T> source)
}
}

class ConstantExpressionBuilder : ZeroArgumentExpressionBuilder
{
public Expression Expression { get; set; }

public override Expression Build(IEnumerable<Expression> arguments)
{
return Expression;
}
}

[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public void Build_MulticastSubjectMissingBuildContext_ThrowsBuildException()
{
var source = new UnitBuilder().Build();
var builder = new MulticastSubject { Name = nameof(BehaviorSubject) };
builder.Build(source);
Assert.Fail();
}

[TestMethod]
public void Build_MulticastSubjectMissingName_ReturnsSameSequence()
{
var source = Expression.Constant(Observable.Return(0));
var builder = new TestWorkflow()
.Append(new ConstantExpressionBuilder { Expression = source })
.Append(new MulticastSubject())
.AppendOutput();
var expression = builder.Workflow.Build();
Assert.AreSame(source, expression);
}

[TestMethod]
[ExpectedException(typeof(WorkflowBuildException))]
public void Build_MulticastInterfaceToSubjectOfDifferentInterface_ThrowsBuildException()
{
var builder = new WorkflowBuilder();
builder.Workflow.Add(new BehaviorSubject<IDisposable> { Name = nameof(BehaviorSubject) });
var source = builder.Workflow.Add(new CombinatorBuilder { Combinator = new DoubleProperty { Value = 5.5 } });
var convert1 = builder.Workflow.Add(new CombinatorBuilder { Combinator = new TypeCombinatorMock<IComparable>() });
var convert2 = builder.Workflow.Add(new MulticastSubject { Name = nameof(BehaviorSubject) });
builder.Workflow.AddEdge(source, convert1, new ExpressionBuilderArgument());
builder.Workflow.AddEdge(convert1, convert2, new ExpressionBuilderArgument());
var builder = new TestWorkflow()
.Append(new BehaviorSubject<IDisposable> { Name = nameof(BehaviorSubject) })
.ResetCursor()
.AppendCombinator(new DoubleProperty { Value = 5.5 })
.AppendCombinator(new TypeCombinatorMock<IComparable>())
.Append(new MulticastSubject { Name = nameof(BehaviorSubject) });
var expression = builder.Workflow.Build();
Assert.IsNotNull(expression);
}

[TestMethod]
public async Task Build_MulticastSourceToSubject_ReturnsSameValue()
{
var value = 32;
var workflow = new TestWorkflow()
.Append(new BehaviorSubject<int> { Name = nameof(BehaviorSubject) })
.ResetCursor()
.AppendCombinator(new IntProperty { Value = value })
.Append(new MulticastSubject { Name = nameof(BehaviorSubject) })
.AppendOutput();
var observable = workflow.BuildObservable<int>();
Assert.AreEqual(value, await observable.Take(1));
}

[TestMethod]
public async Task Build_MulticastSourceToObjectSubject_PreservesTypeOfSourceSequence()
{
// related to https://github.com/bonsai-rx/bonsai/issues/1914
var workflow = new TestWorkflow()
.Append(new BehaviorSubject<object> { Name = nameof(BehaviorSubject) })
.ResetCursor()
.AppendCombinator(new IntProperty())
.Append(new MulticastSubject { Name = nameof(BehaviorSubject) })
.AppendOutput();
var observable = workflow.BuildObservable<int>();
Assert.AreEqual(0, await observable.Take(1));
}

[TestMethod]
public void ResourceSubject_SourceTerminatesExceptionally_ShouldNotTryToDispose()
{
var workflowBuilder = new WorkflowBuilder();
var source = workflowBuilder.Workflow.Add(new CombinatorBuilder { Combinator = new ThrowSource() });
var subject = workflowBuilder.Workflow.Add(new ResourceSubject { Name = nameof(ResourceSubject) });
var sink = workflowBuilder.Workflow.Add(new CombinatorBuilder { Combinator = new CatchSink() });
workflowBuilder.Workflow.AddEdge(source, subject, new ExpressionBuilderArgument());
workflowBuilder.Workflow.AddEdge(subject, sink, new ExpressionBuilderArgument());
var workflowBuilder = new TestWorkflow()
.AppendCombinator(new ThrowSource())
.Append(new ResourceSubject { Name = nameof(ResourceSubject) })
.AppendCombinator(new CatchSink());
var observable = workflowBuilder.Workflow.BuildObservable();
observable.FirstOrDefaultAsync().Wait();
}
Expand All @@ -58,7 +118,7 @@ class CatchSink : Sink
{
public override IObservable<TSource> Process<TSource>(IObservable<TSource> source)
{
return source.Catch<TSource>(Observable.Empty<TSource>());
return source.Catch(Observable.Empty<TSource>());
}
}

Expand Down
8 changes: 8 additions & 0 deletions Bonsai.Core.Tests/TestWorkflow.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq.Expressions;
using Bonsai.Dag;
using Bonsai.Expressions;

Expand Down Expand Up @@ -97,5 +98,12 @@ public ExpressionBuilderGraph ToInspectableGraph()
{
return Workflow.ToInspectableGraph();
}

public IObservable<T> BuildObservable<T>()
{
var expression = Workflow.Build();
var observableFactory = Expression.Lambda<Func<IObservable<T>>>(expression).Compile();
return observableFactory();
}
}
}
23 changes: 21 additions & 2 deletions Bonsai.Core/Expressions/MulticastSubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,16 @@ public override Expression Build(IEnumerable<Expression> arguments)
);
}

source = CoerceMethodArgument(typeof(IObservable<>).MakeGenericType(subjectType), source);
observableType = subjectType;
var conversionParameter = Expression.Parameter(observableType);
var conversionBody = Expression.Convert(conversionParameter, subjectType);
var conversion = Expression.Lambda(conversionBody, conversionParameter);
return Expression.Call(
typeof(MulticastSubject),
nameof(Process),
new[] { observableType, subjectType },
source,
subjectExpression,
conversion);
}

return Expression.Call(typeof(MulticastSubject), nameof(Process), new[] { observableType }, source, subjectExpression);
Expand All @@ -79,6 +87,17 @@ static IObservable<TSource> Process<TSource>(IObservable<TSource> source, IObser
return source.Do(subject);
}

static IObservable<TSource> Process<TSource, TSubject>(
IObservable<TSource> source,
IObserver<TSubject> subject,
Func<TSource, TSubject> conversion)
{
return source.Do(
value => subject.OnNext(conversion(value)),
subject.OnError,
subject.OnCompleted);
}

class MulticastSubjectTypeDescriptionProvider : TypeDescriptionProvider
{
readonly MulticastSubjectTypeDescriptor typeDescriptor = new MulticastSubjectTypeDescriptor(null);
Expand Down