From dc0f261d180bbd93096b6d48fabcad533ee6fc64 Mon Sep 17 00:00:00 2001 From: Silv3rcircl3 Date: Mon, 18 Jan 2016 16:49:50 +0100 Subject: [PATCH] Rewrite of the AtomicReference --- .../CoreAPISpec.ApproveCore.approved.txt | 2 + .../FailureDetectorPuppet.cs | 93 +++++++++++++++++- src/core/Akka.Tests/IO/SimpleDnsCacheSpec.cs | 94 ++++++++++++++++++- .../Akka/Actor/Internal/ActorSystemImpl.cs | 4 +- src/core/Akka/Util/AtomicReference.cs | 45 ++++----- 5 files changed, 205 insertions(+), 33 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 5accc525070..561b52baeb9 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -4518,12 +4518,14 @@ namespace Akka.Util public Akka.Util.ILinearSeq Tail() { } } public class AtomicReference + where T : class { protected T atomicValue; public AtomicReference(T originalValue) { } public AtomicReference() { } public T Value { get; set; } public bool CompareAndSet(T expected, T newValue) { } + public T GetAndSet(T newValue) { } } public class static Base64Encoding { diff --git a/src/core/Akka.Cluster.Tests.MultiNode/FailureDetectorPuppet.cs b/src/core/Akka.Cluster.Tests.MultiNode/FailureDetectorPuppet.cs index 6e7d7df09cf..7d703ad5c78 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/FailureDetectorPuppet.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/FailureDetectorPuppet.cs @@ -5,10 +5,10 @@ // //----------------------------------------------------------------------- +using System.Threading; using Akka.Configuration; using Akka.Event; using Akka.Remote; -using Akka.Util; namespace Akka.Cluster.Tests.MultiNode { @@ -69,6 +69,97 @@ public override void HeartBeat() { _status.CompareAndSet(Status.Unknown, Status.Up); } + + + //This version was replaced with a new version which is restricted to reference types, + //therefore we use the old version here. + private class AtomicReference + { + /// + /// Sets the initial value of this to . + /// + public AtomicReference(T originalValue) + { + atomicValue = originalValue; + } + + /// + /// Default constructor + /// + public AtomicReference() + { + atomicValue = default(T); + } + + // ReSharper disable once InconsistentNaming + protected T atomicValue; + + /// + /// The current value of this + /// + public T Value + { + get + { + Interlocked.MemoryBarrier(); + return atomicValue; + } + set + { + Interlocked.MemoryBarrier(); + atomicValue = value; + Interlocked.MemoryBarrier(); + } + } + + /// + /// If equals , then set the Value to + /// . + /// + /// true if was set + public bool CompareAndSet(T expected, T newValue) + { + //special handling for null values + if (Value == null) + { + if (expected == null) + { + Value = newValue; + return true; + } + return false; + } + + if (Value.Equals(expected)) + { + Value = newValue; + return true; + } + return false; + } + + #region Conversion operators + + /// + /// Implicit conversion operator = automatically casts the to an instance of . + /// + public static implicit operator T(AtomicReference aRef) + { + return aRef.Value; + } + + /// + /// Implicit conversion operator = allows us to cast any type directly into a instance. + /// + /// + /// + public static implicit operator AtomicReference(T newValue) + { + return new AtomicReference(newValue); + } + + #endregion + } } } diff --git a/src/core/Akka.Tests/IO/SimpleDnsCacheSpec.cs b/src/core/Akka.Tests/IO/SimpleDnsCacheSpec.cs index 9a518b3572f..e65f7b56b1a 100644 --- a/src/core/Akka.Tests/IO/SimpleDnsCacheSpec.cs +++ b/src/core/Akka.Tests/IO/SimpleDnsCacheSpec.cs @@ -5,13 +5,14 @@ // //----------------------------------------------------------------------- +using System.Threading; using Akka.IO; using Akka.TestKit; -using Akka.Util; using Xunit; namespace Akka.Tests.IO { + public class SimpleDnsCacheSpec { private class SimpleDnsCacheTestDouble : SimpleDnsCache @@ -79,5 +80,96 @@ public void Cache_should_be_updated_with_the_latest_resolved() cache.Put(cacheEntryTwo, ttl); cache.Cached("test.local").ShouldBe(cacheEntryTwo); } + + + //This version was replaced with a new version which is restricted to reference types, + //therefore we use the old version here. + private class AtomicReference + { + /// + /// Sets the initial value of this to . + /// + public AtomicReference(T originalValue) + { + atomicValue = originalValue; + } + + /// + /// Default constructor + /// + public AtomicReference() + { + atomicValue = default(T); + } + + // ReSharper disable once InconsistentNaming + protected T atomicValue; + + /// + /// The current value of this + /// + public T Value + { + get + { + Interlocked.MemoryBarrier(); + return atomicValue; + } + set + { + Interlocked.MemoryBarrier(); + atomicValue = value; + Interlocked.MemoryBarrier(); + } + } + + /// + /// If equals , then set the Value to + /// . + /// + /// true if was set + public bool CompareAndSet(T expected, T newValue) + { + //special handling for null values + if (Value == null) + { + if (expected == null) + { + Value = newValue; + return true; + } + return false; + } + + if (Value.Equals(expected)) + { + Value = newValue; + return true; + } + return false; + } + + #region Conversion operators + + /// + /// Implicit conversion operator = automatically casts the to an instance of . + /// + public static implicit operator T(AtomicReference aRef) + { + return aRef.Value; + } + + /// + /// Implicit conversion operator = allows us to cast any type directly into a instance. + /// + /// + /// + public static implicit operator AtomicReference(T newValue) + { + return new AtomicReference(newValue); + } + + #endregion + } } } diff --git a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs index ceeecaee7d4..9318649ff3e 100644 --- a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs +++ b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs @@ -403,7 +403,7 @@ public override void Stop(IActorRef actor) class TerminationCallbacks { private Task _terminationTask; - private AtomicReference _atomicRef; + private readonly AtomicReference _atomicRef; public TerminationCallbacks(Task upStreamTerminated) { @@ -411,7 +411,7 @@ public TerminationCallbacks(Task upStreamTerminated) upStreamTerminated.ContinueWith(_ => { - _terminationTask = Interlocked.Exchange(ref _atomicRef, new AtomicReference(null)).Value; + _terminationTask = _atomicRef.GetAndSet(null); _terminationTask.Start(); }); } diff --git a/src/core/Akka/Util/AtomicReference.cs b/src/core/Akka/Util/AtomicReference.cs index f0ea196e30b..d6e03fc6783 100644 --- a/src/core/Akka/Util/AtomicReference.cs +++ b/src/core/Akka/Util/AtomicReference.cs @@ -12,11 +12,12 @@ namespace Akka.Util /// /// Implementation of the java.concurrent.util AtomicReference type. /// - /// Uses internally to enforce ordering of writes + /// Uses internally to enforce ordering of writes /// without any explicit locking. .NET's strong memory on write guarantees might already enforce - /// this ordering, but the addition of the MemoryBarrier guarantees it. + /// this ordering, but the addition of the Volatile guarantees it. /// public class AtomicReference + where T : class { /// /// Sets the initial value of this to . @@ -42,17 +43,8 @@ public AtomicReference() /// public T Value { - get - { - Interlocked.MemoryBarrier(); - return atomicValue; - } - set - { - Interlocked.MemoryBarrier(); - atomicValue = value; - Interlocked.MemoryBarrier(); - } + get { return Volatile.Read(ref atomicValue); } + set { Volatile.Write(ref atomicValue, value); } } /// @@ -62,23 +54,18 @@ public T Value /// true if was set public bool CompareAndSet(T expected, T newValue) { - //special handling for null values - if (Value == null) - { - if (expected == null) - { - Value = newValue; - return true; - } - return false; - } + var previous = Interlocked.CompareExchange(ref atomicValue, newValue, expected); + return ReferenceEquals(previous, expected); + } - if (Value.Equals(expected)) - { - Value = newValue; - return true; - } - return false; + /// + /// Atomically sets the to and returns the old . + /// + /// The new value + /// The old value + public T GetAndSet(T newValue) + { + return Interlocked.Exchange(ref atomicValue, newValue); } #region Conversion operators