Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc concurrency improvements #381

Merged
merged 2 commits into from
Apr 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public sealed class AsyncLock : IDisposable
/// processed by the owner.
/// </summary>
/// <param name="action">Action to queue for execution.</param>
/// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
/// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
public void Wait(Action action)
{
if (action == null)
Expand All @@ -46,7 +46,9 @@ public void Wait(Action action)
lock (queue)
{
if (queue.Count > 0)
{
work = queue.Dequeue();
}
else
{
isAcquired = false;
Expand All @@ -65,6 +67,7 @@ public void Wait(Action action)
queue.Clear();
hasFaulted = true;
}

throw;
}
}
Expand Down
19 changes: 9 additions & 10 deletions Rx.NET/Source/src/System.Reactive/Concurrency/CatchScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace System.Reactive.Concurrency
{
class CatchScheduler<TException> : SchedulerWrapper
internal sealed class CatchScheduler<TException> : SchedulerWrapper
where TException : Exception
{
private readonly Func<TException, bool> _handler;
Expand All @@ -26,11 +26,8 @@ protected override Func<IScheduler, TState, IDisposable> Wrap<TState>(Func<ISche
{
return action(GetRecursiveWrapper(self), state);
}
catch (TException exception)
catch (TException exception) when (_handler(exception))
{
if (!_handler(exception))
throw;

return Disposable.Empty;
}
};
Expand All @@ -54,15 +51,19 @@ protected override bool TryGetService(IServiceProvider provider, Type serviceTyp
if (service != null)
{
if (serviceType == typeof(ISchedulerLongRunning))
{
service = new CatchSchedulerLongRunning((ISchedulerLongRunning)service, _handler);
}
else if (serviceType == typeof(ISchedulerPeriodic))
{
service = new CatchSchedulerPeriodic((ISchedulerPeriodic)service, _handler);
}
}

return true;
}

class CatchSchedulerLongRunning : ISchedulerLongRunning
private class CatchSchedulerLongRunning : ISchedulerLongRunning
{
private readonly ISchedulerLongRunning _scheduler;
private readonly Func<TException, bool> _handler;
Expand All @@ -81,16 +82,14 @@ public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICan
{
action(state_, cancel);
}
catch (TException exception)
catch (TException exception) when (_handler(exception))
{
if (!_handler(exception))
throw;
}
});
}
}

class CatchSchedulerPeriodic : ISchedulerPeriodic
private sealed class CatchSchedulerPeriodic : ISchedulerPeriodic
{
private readonly ISchedulerPeriodic _scheduler;
private readonly Func<TException, bool> _handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ public interface IConcurrencyAbstractionLayer
/// <summary>
/// Gets whether long-running scheduling is supported.
/// </summary>
bool SupportsLongRunning
{
get;
}
bool SupportsLongRunning { get; }

/// <summary>
/// Starts a new long-running thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
// See the LICENSE file in the project root for more information.

#if NO_THREAD && WINDOWS
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;

Expand Down Expand Up @@ -72,28 +70,16 @@ public void Sleep(TimeSpan timeout)
e.Wait();
}

public IStopwatch StartStopwatch()
{
return new StopwatchImpl();
}
public IStopwatch StartStopwatch() => new StopwatchImpl();

public bool SupportsLongRunning
{
get { return false; }
}
public bool SupportsLongRunning => false;

public void StartThread(Action<object> action, object state)
{
throw new NotSupportedException();
}

private TimeSpan Normalize(TimeSpan dueTime)
{
if (dueTime < TimeSpan.Zero)
return TimeSpan.Zero;

return dueTime;
}
private TimeSpan Normalize(TimeSpan dueTime) => dueTime < TimeSpan.Zero ? TimeSpan.Zero : dueTime;
}
}
#endif
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information.

#if !NO_THREAD
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;
Expand All @@ -17,10 +16,7 @@ namespace System.Reactive.Concurrency
//
internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer
{
public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime)
{
return new Timer(action, state, Normalize(dueTime));
}
public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) => new Timer(action, state, Normalize(dueTime));

public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
{
Expand All @@ -47,27 +43,11 @@ public IDisposable QueueUserWorkItem(Action<object> action, object state)
return Disposable.Empty;
}

#if USE_SLEEP_MS
public void Sleep(TimeSpan timeout)
{
System.Threading.Thread.Sleep((int)Normalize(timeout).TotalMilliseconds);
}
#else
public void Sleep(TimeSpan timeout)
{
System.Threading.Thread.Sleep(Normalize(timeout));
}
#endif
public void Sleep(TimeSpan timeout) => System.Threading.Thread.Sleep(Normalize(timeout));

public IStopwatch StartStopwatch()
{
return new StopwatchImpl();
}
public IStopwatch StartStopwatch() => new StopwatchImpl();

public bool SupportsLongRunning
{
get { return true; }
}
public bool SupportsLongRunning => true;

public void StartThread(Action<object> action, object state)
{
Expand All @@ -77,13 +57,7 @@ public void StartThread(Action<object> action, object state)
}) { IsBackground = true }.Start();
}

private static TimeSpan Normalize(TimeSpan dueTime)
{
if (dueTime < TimeSpan.Zero)
return TimeSpan.Zero;

return dueTime;
}
private static TimeSpan Normalize(TimeSpan dueTime) => dueTime < TimeSpan.Zero ? TimeSpan.Zero : dueTime;

//
// Some historical context. In the early days of Rx, we discovered an issue with
Expand Down Expand Up @@ -156,7 +130,7 @@ private static TimeSpan Normalize(TimeSpan dueTime)
// symbol.
//

class Timer : IDisposable
private sealed class Timer : IDisposable
{
private Action<object> _action;
private volatile System.Threading.Timer _timer;
Expand Down Expand Up @@ -190,10 +164,7 @@ private void Tick(object state)
}
}

private bool IsTimerAssigned()
{
return _timer != null;
}
private bool IsTimerAssigned() => _timer != null;

public void Dispose()
{
Expand All @@ -208,7 +179,7 @@ public void Dispose()
}
}

class PeriodicTimer : IDisposable
private sealed class PeriodicTimer : IDisposable
{
private Action _action;
private volatile System.Threading.Timer _timer;
Expand All @@ -224,10 +195,7 @@ public PeriodicTimer(Action action, TimeSpan period)
_timer = new System.Threading.Timer(this.Tick, null, period, period);
}

private void Tick(object state)
{
_action();
}
private void Tick(object state) => _action();

public void Dispose()
{
Expand All @@ -242,7 +210,7 @@ public void Dispose()
}
}

class FastPeriodicTimer : IDisposable
private sealed class FastPeriodicTimer : IDisposable
{
private readonly Action _action;
private volatile bool disposed;
Expand Down Expand Up @@ -274,4 +242,4 @@ public void Dispose()
}
}
}
#endif
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,22 @@ public sealed class CurrentThreadScheduler : LocalScheduler
{
private static readonly Lazy<CurrentThreadScheduler> s_instance = new Lazy<CurrentThreadScheduler>(() => new CurrentThreadScheduler());

CurrentThreadScheduler()
private CurrentThreadScheduler()
{
}

/// <summary>
/// Gets the singleton instance of the current thread scheduler.
/// </summary>
public static CurrentThreadScheduler Instance
{
get { return s_instance.Value; }
}
public static CurrentThreadScheduler Instance => s_instance.Value;

[ThreadStatic]
static SchedulerQueue<TimeSpan> s_threadLocalQueue;
private static SchedulerQueue<TimeSpan> s_threadLocalQueue;

[ThreadStatic]
static IStopwatch s_clock;
private static IStopwatch s_clock;

private static SchedulerQueue<TimeSpan> GetQueue()
{
return s_threadLocalQueue;
}
private static SchedulerQueue<TimeSpan> GetQueue() => s_threadLocalQueue;

private static void SetQueue(SchedulerQueue<TimeSpan> newQueue)
{
Expand All @@ -61,25 +55,13 @@ private static TimeSpan Time
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Justification = "Now marked as obsolete.")]
[EditorBrowsable(EditorBrowsableState.Never)]
[Obsolete(Constants_Core.OBSOLETE_SCHEDULEREQUIRED)] // Preferring static method call over instance method call.
public bool ScheduleRequired
{
get
{
return IsScheduleRequired;
}
}
public bool ScheduleRequired => IsScheduleRequired;

/// <summary>
/// Gets a value that indicates whether the caller must call a Schedule method.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static bool IsScheduleRequired
{
get
{
return GetQueue() == null;
}
}
public static bool IsScheduleRequired => GetQueue() == null;

/// <summary>
/// Schedules an action to be executed after dueTime.
Expand All @@ -89,7 +71,7 @@ public static bool IsScheduleRequired
/// <param name="action">Action to be executed.</param>
/// <param name="dueTime">Relative time after which to execute the action.</param>
/// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
/// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
/// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
Expand All @@ -106,14 +88,14 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
queue = new SchedulerQueue<TimeSpan>(4);
queue.Enqueue(si);

CurrentThreadScheduler.SetQueue(queue);
SetQueue(queue);
try
{
Trampoline.Run(queue);
}
finally
{
CurrentThreadScheduler.SetQueue(null);
SetQueue(null);
}
}
else
Expand All @@ -124,7 +106,7 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
return Disposable.Create(si.Cancel);
}

static class Trampoline
private static class Trampoline
{
public static void Run(SchedulerQueue<TimeSpan> queue)
{
Expand All @@ -133,14 +115,16 @@ public static void Run(SchedulerQueue<TimeSpan> queue)
var item = queue.Dequeue();
if (!item.IsCanceled)
{
var wait = item.DueTime - CurrentThreadScheduler.Time;
var wait = item.DueTime - Time;
if (wait.Ticks > 0)
{
ConcurrencyAbstractionLayer.Current.Sleep(wait);
}

if (!item.IsCanceled)
{
item.Invoke();
}
}
}
}
Expand Down
Loading