Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Make Scheduler.Recursive more uniform and allocate less #617

Merged
merged 3 commits into from
Jun 20, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 111 additions & 110 deletions Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Reactive.Disposables;

namespace System.Reactive.Concurrency
Expand Down Expand Up @@ -46,45 +47,9 @@ public static IDisposable Schedule<TState>(this IScheduler scheduler, TState sta

private static IDisposable InvokeRec1<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState>> action) tuple)
{
var group = new CompositeDisposable(1);
var gate = new object();

Action<TState> recursiveAction = null;
recursiveAction = state1 => tuple.action(state1, state2 =>
{
var isAdded = false;
var isDone = false;
var d = default(IDisposable);
d = scheduler.Schedule(state2, (scheduler1, state3) =>
{
lock (gate)
{
if (isAdded)
{
group.Remove(d);
}
else
{
isDone = true;
}
}
recursiveAction(state3);
return Disposable.Empty;
});

lock (gate)
{
if (!isDone)
{
group.Add(d);
isAdded = true;
}
}
});

recursiveAction(tuple.state);

return group;
var recursiveInvoker = new InvokeRec1State<TState>(scheduler, tuple.action);
recursiveInvoker.InvokeFirst(tuple.state);
return recursiveInvoker;
}

/// <summary>
Expand Down Expand Up @@ -127,45 +92,9 @@ public static IDisposable Schedule<TState>(this IScheduler scheduler, TState sta

private static IDisposable InvokeRec2<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState, TimeSpan>> action) tuple)
{
var group = new CompositeDisposable(1);
var gate = new object();

Action<TState> recursiveAction = null;
recursiveAction = state1 => tuple.action(state1, (state2, dueTime1) =>
{
var isAdded = false;
var isDone = false;
var d = default(IDisposable);
d = scheduler.Schedule(state2, dueTime1, (scheduler1, state3) =>
{
lock (gate)
{
if (isAdded)
{
group.Remove(d);
}
else
{
isDone = true;
}
}
recursiveAction(state3);
return Disposable.Empty;
});

lock (gate)
{
if (!isDone)
{
group.Add(d);
isAdded = true;
}
}
});

recursiveAction(tuple.state);

return group;
var recursiveInvoker = new InvokeRec2State<TState>(scheduler, tuple.action);
recursiveInvoker.InvokeFirst(tuple.state);
return recursiveInvoker;
}

/// <summary>
Expand Down Expand Up @@ -208,45 +137,117 @@ public static IDisposable Schedule<TState>(this IScheduler scheduler, TState sta

private static IDisposable InvokeRec3<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState, DateTimeOffset>> action) tuple)
{
var group = new CompositeDisposable(1);
var gate = new object();
var recursiveInvoker = new InvokeRec3State<TState>(scheduler, tuple.action);
recursiveInvoker.InvokeFirst(tuple.state);
return recursiveInvoker;
}

abstract class InvokeRecBaseState<TState> : IDisposable
{
protected readonly IScheduler scheduler;

protected readonly CompositeDisposable group;

protected long index;

public InvokeRecBaseState(IScheduler scheduler)
{
this.scheduler = scheduler;
group = new CompositeDisposable();
}

public void Dispose()
{
group.Dispose();
}

}

sealed class InvokeRec1State<TState> : InvokeRecBaseState<TState>
{
readonly Action<TState, Action<TState>> action;

readonly Action<TState> recurseCallback;

public InvokeRec1State(IScheduler scheduler, Action<TState, Action<TState>> action) : base(scheduler)
{
this.action = action;
recurseCallback = state => InvokeNext(state);
}

internal void InvokeNext(TState state)
{
var sad = new SingleAssignmentDisposable();
group.Add(sad);
sad.Disposable = scheduler.Schedule((state, sad, @this: this), (_, nextState) => {
nextState.@this.group.Remove(nextState.sad);
nextState.@this.InvokeFirst(nextState.state);
return Disposable.Empty;
});
}

internal void InvokeFirst(TState state)
{
action(state, recurseCallback);
}
}

sealed class InvokeRec2State<TState> : InvokeRecBaseState<TState>
{
readonly Action<TState, Action<TState, TimeSpan>> action;

readonly Action<TState, TimeSpan> recurseCallback;

Action<TState> recursiveAction = null;
recursiveAction = state1 => tuple.action(state1, (state2, dueTime1) =>
public InvokeRec2State(IScheduler scheduler, Action<TState, Action<TState, TimeSpan>> action) : base(scheduler)
{
var isAdded = false;
var isDone = false;
var d = default(IDisposable);
d = scheduler.Schedule(state2, dueTime1, (scheduler1, state3) =>
{
lock (gate)
{
if (isAdded)
{
group.Remove(d);
}
else
{
isDone = true;
}
}
recursiveAction(state3);
this.action = action;
recurseCallback = (state, time) => InvokeNext(state, time);
}

internal void InvokeNext(TState state, TimeSpan time)
{
var sad = new SingleAssignmentDisposable();
group.Add(sad);
sad.Disposable = scheduler.Schedule((state, sad, @this: this), time, (_, nextState) => {
nextState.@this.group.Remove(nextState.sad);
nextState.@this.InvokeFirst(nextState.state);
return Disposable.Empty;
});
}

internal void InvokeFirst(TState state)
{
action(state, recurseCallback);
}
}

sealed class InvokeRec3State<TState> : InvokeRecBaseState<TState>
{
readonly Action<TState, Action<TState, DateTimeOffset>> action;

lock (gate)
{
if (!isDone)
{
group.Add(d);
isAdded = true;
}
}
});
readonly Action<TState, DateTimeOffset> recurseCallback;

recursiveAction(tuple.state);
public InvokeRec3State(IScheduler scheduler, Action<TState, Action<TState, DateTimeOffset>> action) : base(scheduler)
{
this.action = action;
recurseCallback = (state, dtOffset) => InvokeNext(state, dtOffset);
}

internal void InvokeNext(TState state, DateTimeOffset dtOffset)
{
var sad = new SingleAssignmentDisposable();
group.Add(sad);
sad.Disposable = scheduler.Schedule((state, sad, @this: this), dtOffset, (_, nextState) => {
nextState.@this.group.Remove(nextState.sad);
nextState.@this.InvokeFirst(nextState.state);
return Disposable.Empty;
});
}

return group;
internal void InvokeFirst(TState state)
{
action(state, recurseCallback);
}
}
}
}