Skip to content

Commit

Permalink
Merge pull request #30 from Cysharp/hadashiA/operators-concat
Browse files Browse the repository at this point in the history
Add operator Concat(Observable<Observable<T>>)
  • Loading branch information
neuecc authored Jan 11, 2024
2 parents ef1c553 + 9961f00 commit b98e6c1
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 2 deletions.
143 changes: 143 additions & 0 deletions src/R3/Operators/Concat.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
namespace R3;

public static partial class Observable
{
public static Observable<T> Concat<T>(this Observable<Observable<T>> sources)
{
return new ConcatMany<T>(sources);
}
}

internal sealed class ConcatMany<T>(Observable<Observable<T>> sources) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return sources.Subscribe(new _ConcatMany(observer));
}

sealed class _ConcatMany(Observer<T> observer) : Observer<Observable<T>>
{
readonly Observer<T> observer = observer;
readonly object gate = new();
readonly Queue<Observable<T>> q = new();

SerialDisposableCore serialDisposable;
bool isStopped;
int activeCount;

// keep when inner is running
protected override bool AutoDisposeOnCompleted => false;

protected override void OnNextCore(Observable<T> value)
{
lock (gate)
{
if (activeCount < 1)
{
activeCount++;
serialDisposable.Disposable = value.Subscribe(new ConcatInner(this));
}
else
{
q.Enqueue(value);
}
}
}

protected override void OnErrorResumeCore(Exception error)
{
lock (gate)
{
observer.OnErrorResume(error);
}
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
PublishCompleted(result);
}
else
{
lock (gate)
{
isStopped = true;
if (activeCount == 0)
{
PublishCompleted(result);
}
}
}
}

protected override void DisposeCore()
{
serialDisposable.Dispose();
}

void PublishCompleted(Result result)
{
try
{
lock (gate)
{
observer.OnCompleted(result);
}
}
finally
{
Dispose();
}
}

sealed class ConcatInner(_ConcatMany parent) : Observer<T>
{
// Manual disposing by SerialDisposableCore
protected override bool AutoDisposeOnCompleted => false;

protected override void OnNextCore(T value)
{
lock (parent.gate)
{
parent.observer.OnNext(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
lock (parent.gate)
{
parent.observer.OnErrorResume(error);
}
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
parent.OnCompleted();
}
else
{
lock (parent.gate)
{
if (parent.q.Count > 0)
{
var nextSource = parent.q.Dequeue();
parent.serialDisposable.Disposable = nextSource.Subscribe(new ConcatInner(parent));
}
else
{
parent.activeCount--;
if (parent is { isStopped: true, activeCount: 0 })
{
parent.PublishCompleted(result);
}
}
}
}
}
}
}
}
92 changes: 90 additions & 2 deletions tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public void Prepend()

subject.OnCompleted();
list.AssertIsCompleted();
}

}

[Fact]
public void Prepend2()
{
Expand Down Expand Up @@ -92,4 +92,92 @@ public void ConcatMany()

list.AssertIsCompleted();
}

[Fact]
public void ConcatNestedSources()
{
var outerSubject = new Subject<Observable<int>>();
using var list = outerSubject.Concat().ToLiveList();

var subject1 = new Subject<int>();
var subject2 = new Subject<int>();
var subject3 = new Subject<int>();

outerSubject.OnNext(subject1);
outerSubject.OnNext(subject2);
outerSubject.OnNext(subject3);

subject1.OnNext(10);
subject2.OnNext(9999);

list.AssertEqual([10]);

subject1.OnCompleted();

subject2.OnNext(11111);

list.AssertEqual([10, 11111]);

subject2.OnCompleted();

subject3.OnNext(9999999);

list.AssertEqual([10, 11111, 9999999]);

subject3.OnCompleted();

outerSubject.OnCompleted();

list.AssertIsCompleted();
}

[Fact]
public void ConcatNestedSources_WaitForInner()
{
var outerSubject = new Subject<Observable<int>>();
using var list = outerSubject.Concat().ToLiveList();

var subject1 = new Subject<int>();
var subject2 = new Subject<int>();
var subject3 = new Subject<int>();

outerSubject.OnNext(subject1);
outerSubject.OnNext(subject2);
outerSubject.OnNext(subject3);

subject1.OnNext(10);
subject2.OnNext(9999);

list.AssertEqual([10]);

subject1.OnCompleted();

subject2.OnNext(11111);

list.AssertEqual([10, 11111]);

subject2.OnCompleted();

subject3.OnNext(9999999);
outerSubject.OnCompleted();

list.AssertIsNotCompleted();
list.AssertEqual([10, 11111, 9999999]);

subject3.OnCompleted();
list.AssertIsCompleted();
}

[Fact]
public void ConcatNestedSources_Empty()
{
var outerSubject = new Subject<Observable<int>>();

using var list = outerSubject.Concat().ToLiveList();
list.AssertIsNotCompleted();
outerSubject.OnCompleted();

list.AssertIsCompleted();
list.AssertEqual([]);
}
}

0 comments on commit b98e6c1

Please sign in to comment.