Skip to content

Commit

Permalink
Rewrite of the AtomicReference
Browse files Browse the repository at this point in the history
  • Loading branch information
marcpiechura committed Jan 18, 2016
1 parent b026bb9 commit dc0f261
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 33 deletions.
2 changes: 2 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4518,12 +4518,14 @@ namespace Akka.Util
public Akka.Util.ILinearSeq<T> Tail() { }
}
public class AtomicReference<T>
where T : class
{
protected T atomicValue;
public AtomicReference(T originalValue) { }
public AtomicReference() { }
public T Value { get; set; }
public bool CompareAndSet(T expected, T newValue) { }
public T GetAndSet(T newValue) { }
}
public class static Base64Encoding
{
Expand Down
93 changes: 92 additions & 1 deletion src/core/Akka.Cluster.Tests.MultiNode/FailureDetectorPuppet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
// </copyright>
//-----------------------------------------------------------------------

using System.Threading;
using Akka.Configuration;
using Akka.Event;
using Akka.Remote;
using Akka.Util;

namespace Akka.Cluster.Tests.MultiNode
{
Expand Down Expand Up @@ -69,6 +69,97 @@ public override void HeartBeat()
{
_status.CompareAndSet(Status.Unknown, Status.Up);
}


//This version was replaced with a new version which is restricted to reference types,
//therefore we use the old version here.
private class AtomicReference<T>
{
/// <summary>
/// Sets the initial value of this <see cref="AtomicReference{T}"/> to <paramref name="originalValue"/>.
/// </summary>
public AtomicReference(T originalValue)
{
atomicValue = originalValue;
}

/// <summary>
/// Default constructor
/// </summary>
public AtomicReference()
{
atomicValue = default(T);
}

// ReSharper disable once InconsistentNaming
protected T atomicValue;

/// <summary>
/// The current value of this <see cref="AtomicReference{T}"/>
/// </summary>
public T Value
{
get
{
Interlocked.MemoryBarrier();
return atomicValue;
}
set
{
Interlocked.MemoryBarrier();
atomicValue = value;
Interlocked.MemoryBarrier();
}
}

/// <summary>
/// If <see cref="Value"/> equals <paramref name="expected"/>, then set the Value to
/// <paramref name="newValue"/>.
/// </summary>
/// <returns><c>true</c> if <paramref name="newValue"/> was set</returns>
public bool CompareAndSet(T expected, T newValue)
{
//special handling for null values
if (Value == null)
{
if (expected == null)
{
Value = newValue;
return true;
}
return false;
}

if (Value.Equals(expected))
{
Value = newValue;
return true;
}
return false;
}

#region Conversion operators

/// <summary>
/// Implicit conversion operator = automatically casts the <see cref="AtomicReference{T}"/> to an instance of <typeparamref name="T"/>.
/// </summary>
public static implicit operator T(AtomicReference<T> aRef)
{
return aRef.Value;
}

/// <summary>
/// Implicit conversion operator = allows us to cast any type directly into a <see cref="AtomicReference{T}"/> instance.
/// </summary>
/// <param name="newValue"></param>
/// <returns></returns>
public static implicit operator AtomicReference<T>(T newValue)
{
return new AtomicReference<T>(newValue);
}

#endregion
}
}
}

94 changes: 93 additions & 1 deletion src/core/Akka.Tests/IO/SimpleDnsCacheSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
// </copyright>
//-----------------------------------------------------------------------

using System.Threading;
using Akka.IO;
using Akka.TestKit;
using Akka.Util;
using Xunit;

namespace Akka.Tests.IO
{

public class SimpleDnsCacheSpec
{
private class SimpleDnsCacheTestDouble : SimpleDnsCache
Expand Down Expand Up @@ -79,5 +80,96 @@ public void Cache_should_be_updated_with_the_latest_resolved()
cache.Put(cacheEntryTwo, ttl);
cache.Cached("test.local").ShouldBe(cacheEntryTwo);
}


//This version was replaced with a new version which is restricted to reference types,
//therefore we use the old version here.
private class AtomicReference<T>
{
/// <summary>
/// Sets the initial value of this <see cref="AtomicReference{T}"/> to <paramref name="originalValue"/>.
/// </summary>
public AtomicReference(T originalValue)
{
atomicValue = originalValue;
}

/// <summary>
/// Default constructor
/// </summary>
public AtomicReference()
{
atomicValue = default(T);
}

// ReSharper disable once InconsistentNaming
protected T atomicValue;

/// <summary>
/// The current value of this <see cref="AtomicReference{T}"/>
/// </summary>
public T Value
{
get
{
Interlocked.MemoryBarrier();
return atomicValue;
}
set
{
Interlocked.MemoryBarrier();
atomicValue = value;
Interlocked.MemoryBarrier();
}
}

/// <summary>
/// If <see cref="Value"/> equals <paramref name="expected"/>, then set the Value to
/// <paramref name="newValue"/>.
/// </summary>
/// <returns><c>true</c> if <paramref name="newValue"/> was set</returns>
public bool CompareAndSet(T expected, T newValue)
{
//special handling for null values
if (Value == null)
{
if (expected == null)
{
Value = newValue;
return true;
}
return false;
}

if (Value.Equals(expected))
{
Value = newValue;
return true;
}
return false;
}

#region Conversion operators

/// <summary>
/// Implicit conversion operator = automatically casts the <see cref="AtomicReference{T}"/> to an instance of <typeparamref name="T"/>.
/// </summary>
public static implicit operator T(AtomicReference<T> aRef)
{
return aRef.Value;
}

/// <summary>
/// Implicit conversion operator = allows us to cast any type directly into a <see cref="AtomicReference{T}"/> instance.
/// </summary>
/// <param name="newValue"></param>
/// <returns></returns>
public static implicit operator AtomicReference<T>(T newValue)
{
return new AtomicReference<T>(newValue);
}

#endregion
}
}
}
4 changes: 2 additions & 2 deletions src/core/Akka/Actor/Internal/ActorSystemImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -403,15 +403,15 @@ public override void Stop(IActorRef actor)
class TerminationCallbacks
{
private Task _terminationTask;
private AtomicReference<Task> _atomicRef;
private readonly AtomicReference<Task> _atomicRef;

public TerminationCallbacks(Task upStreamTerminated)
{
_atomicRef = new AtomicReference<Task>(new Task(() => {}));

upStreamTerminated.ContinueWith(_ =>
{
_terminationTask = Interlocked.Exchange(ref _atomicRef, new AtomicReference<Task>(null)).Value;
_terminationTask = _atomicRef.GetAndSet(null);
_terminationTask.Start();
});
}
Expand Down
45 changes: 16 additions & 29 deletions src/core/Akka/Util/AtomicReference.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ namespace Akka.Util
/// <summary>
/// Implementation of the java.concurrent.util AtomicReference type.
///
/// Uses <see cref="Interlocked.MemoryBarrier"/> internally to enforce ordering of writes
/// Uses <see cref="Volatile"/> internally to enforce ordering of writes
/// without any explicit locking. .NET's strong memory on write guarantees might already enforce
/// this ordering, but the addition of the MemoryBarrier guarantees it.
/// this ordering, but the addition of the Volatile guarantees it.
/// </summary>
public class AtomicReference<T>
where T : class
{
/// <summary>
/// Sets the initial value of this <see cref="AtomicReference{T}"/> to <paramref name="originalValue"/>.
Expand All @@ -42,17 +43,8 @@ public AtomicReference()
/// </summary>
public T Value
{
get
{
Interlocked.MemoryBarrier();
return atomicValue;
}
set
{
Interlocked.MemoryBarrier();
atomicValue = value;
Interlocked.MemoryBarrier();
}
get { return Volatile.Read(ref atomicValue); }
set { Volatile.Write(ref atomicValue, value); }
}

/// <summary>
Expand All @@ -62,23 +54,18 @@ public T Value
/// <returns><c>true</c> if <paramref name="newValue"/> was set</returns>
public bool CompareAndSet(T expected, T newValue)
{
//special handling for null values
if (Value == null)
{
if (expected == null)
{
Value = newValue;
return true;
}
return false;
}
var previous = Interlocked.CompareExchange(ref atomicValue, newValue, expected);
return ReferenceEquals(previous, expected);
}

if (Value.Equals(expected))
{
Value = newValue;
return true;
}
return false;
/// <summary>
/// Atomically sets the <see cref="Value"/> to <paramref name="newValue"/> and returns the old <see cref="Value"/>.
/// </summary>
/// <param name="newValue">The new value</param>
/// <returns>The old value</returns>
public T GetAndSet(T newValue)
{
return Interlocked.Exchange(ref atomicValue, newValue);
}

#region Conversion operators
Expand Down

0 comments on commit dc0f261

Please sign in to comment.