Skip to content

Commit

Permalink
Make Scheduler.Recursive more uniform and allocate less (#617)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and danielcweber committed Jun 20, 2018
1 parent 8b56611 commit e2092e1
Showing 1 changed file with 111 additions and 110 deletions.
221 changes: 111 additions & 110 deletions Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Reactive.Disposables;

namespace System.Reactive.Concurrency
Expand Down Expand Up @@ -46,45 +47,9 @@ public static IDisposable Schedule<TState>(this IScheduler scheduler, TState sta

private static IDisposable InvokeRec1<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState>> action) tuple)
{
var group = new CompositeDisposable(1);
var gate = new object();

Action<TState> recursiveAction = null;
recursiveAction = state1 => tuple.action(state1, state2 =>
{
var isAdded = false;
var isDone = false;
var d = default(IDisposable);
d = scheduler.Schedule(state2, (scheduler1, state3) =>
{
lock (gate)
{
if (isAdded)
{
group.Remove(d);
}
else
{
isDone = true;
}
}
recursiveAction(state3);
return Disposable.Empty;
});

lock (gate)
{
if (!isDone)
{
group.Add(d);
isAdded = true;
}
}
});

recursiveAction(tuple.state);

return group;
var recursiveInvoker = new InvokeRec1State<TState>(scheduler, tuple.action);
recursiveInvoker.InvokeFirst(tuple.state);
return recursiveInvoker;
}

/// <summary>
Expand Down Expand Up @@ -127,45 +92,9 @@ public static IDisposable Schedule<TState>(this IScheduler scheduler, TState sta

private static IDisposable InvokeRec2<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState, TimeSpan>> action) tuple)
{
var group = new CompositeDisposable(1);
var gate = new object();

Action<TState> recursiveAction = null;
recursiveAction = state1 => tuple.action(state1, (state2, dueTime1) =>
{
var isAdded = false;
var isDone = false;
var d = default(IDisposable);
d = scheduler.Schedule(state2, dueTime1, (scheduler1, state3) =>
{
lock (gate)
{
if (isAdded)
{
group.Remove(d);
}
else
{
isDone = true;
}
}
recursiveAction(state3);
return Disposable.Empty;
});

lock (gate)
{
if (!isDone)
{
group.Add(d);
isAdded = true;
}
}
});

recursiveAction(tuple.state);

return group;
var recursiveInvoker = new InvokeRec2State<TState>(scheduler, tuple.action);
recursiveInvoker.InvokeFirst(tuple.state);
return recursiveInvoker;
}

/// <summary>
Expand Down Expand Up @@ -208,45 +137,117 @@ public static IDisposable Schedule<TState>(this IScheduler scheduler, TState sta

private static IDisposable InvokeRec3<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState, DateTimeOffset>> action) tuple)
{
var group = new CompositeDisposable(1);
var gate = new object();
var recursiveInvoker = new InvokeRec3State<TState>(scheduler, tuple.action);
recursiveInvoker.InvokeFirst(tuple.state);
return recursiveInvoker;
}

abstract class InvokeRecBaseState<TState> : IDisposable
{
protected readonly IScheduler scheduler;

protected readonly CompositeDisposable group;

protected long index;

public InvokeRecBaseState(IScheduler scheduler)
{
this.scheduler = scheduler;
group = new CompositeDisposable();
}

public void Dispose()
{
group.Dispose();
}

}

sealed class InvokeRec1State<TState> : InvokeRecBaseState<TState>
{
readonly Action<TState, Action<TState>> action;

readonly Action<TState> recurseCallback;

public InvokeRec1State(IScheduler scheduler, Action<TState, Action<TState>> action) : base(scheduler)
{
this.action = action;
recurseCallback = state => InvokeNext(state);
}

internal void InvokeNext(TState state)
{
var sad = new SingleAssignmentDisposable();
group.Add(sad);
sad.Disposable = scheduler.Schedule((state, sad, @this: this), (_, nextState) => {
nextState.@this.group.Remove(nextState.sad);
nextState.@this.InvokeFirst(nextState.state);
return Disposable.Empty;
});
}

internal void InvokeFirst(TState state)
{
action(state, recurseCallback);
}
}

sealed class InvokeRec2State<TState> : InvokeRecBaseState<TState>
{
readonly Action<TState, Action<TState, TimeSpan>> action;

readonly Action<TState, TimeSpan> recurseCallback;

Action<TState> recursiveAction = null;
recursiveAction = state1 => tuple.action(state1, (state2, dueTime1) =>
public InvokeRec2State(IScheduler scheduler, Action<TState, Action<TState, TimeSpan>> action) : base(scheduler)
{
var isAdded = false;
var isDone = false;
var d = default(IDisposable);
d = scheduler.Schedule(state2, dueTime1, (scheduler1, state3) =>
{
lock (gate)
{
if (isAdded)
{
group.Remove(d);
}
else
{
isDone = true;
}
}
recursiveAction(state3);
this.action = action;
recurseCallback = (state, time) => InvokeNext(state, time);
}

internal void InvokeNext(TState state, TimeSpan time)
{
var sad = new SingleAssignmentDisposable();
group.Add(sad);
sad.Disposable = scheduler.Schedule((state, sad, @this: this), time, (_, nextState) => {
nextState.@this.group.Remove(nextState.sad);
nextState.@this.InvokeFirst(nextState.state);
return Disposable.Empty;
});
}

internal void InvokeFirst(TState state)
{
action(state, recurseCallback);
}
}

sealed class InvokeRec3State<TState> : InvokeRecBaseState<TState>
{
readonly Action<TState, Action<TState, DateTimeOffset>> action;

lock (gate)
{
if (!isDone)
{
group.Add(d);
isAdded = true;
}
}
});
readonly Action<TState, DateTimeOffset> recurseCallback;

recursiveAction(tuple.state);
public InvokeRec3State(IScheduler scheduler, Action<TState, Action<TState, DateTimeOffset>> action) : base(scheduler)
{
this.action = action;
recurseCallback = (state, dtOffset) => InvokeNext(state, dtOffset);
}

internal void InvokeNext(TState state, DateTimeOffset dtOffset)
{
var sad = new SingleAssignmentDisposable();
group.Add(sad);
sad.Disposable = scheduler.Schedule((state, sad, @this: this), dtOffset, (_, nextState) => {
nextState.@this.group.Remove(nextState.sad);
nextState.@this.InvokeFirst(nextState.state);
return Disposable.Empty;
});
}

return group;
internal void InvokeFirst(TState state)
{
action(state, recurseCallback);
}
}
}
}

0 comments on commit e2092e1

Please sign in to comment.