From ca2e232705bcad5af9ea84c3d5b95b82c72e797f Mon Sep 17 00:00:00 2001 From: Petri Kero Date: Mon, 9 Mar 2020 21:25:24 +0200 Subject: [PATCH] Fix atomic updates of AddressTerminatedTopic._subscribers. Also, optimize the implementation to not take copies on Subscribe() or Unsubscribe(). --- src/core/Akka/Event/AddressTerminatedTopic.cs | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/src/core/Akka/Event/AddressTerminatedTopic.cs b/src/core/Akka/Event/AddressTerminatedTopic.cs index b13d366acdc..d16fd7cf756 100644 --- a/src/core/Akka/Event/AddressTerminatedTopic.cs +++ b/src/core/Akka/Event/AddressTerminatedTopic.cs @@ -5,7 +5,9 @@ // //----------------------------------------------------------------------- +using System; using System.Collections.Generic; +using System.Linq; using Akka.Actor; using Akka.Util; @@ -35,7 +37,7 @@ public override AddressTerminatedTopic CreateExtension(ExtendedActorSystem syste /// internal sealed class AddressTerminatedTopic : IExtension { - private readonly AtomicReference> _subscribers = new AtomicReference>(new HashSet()); + private readonly HashSet _subscribers = new HashSet(); /// /// Retrieves the extension from the specified actor system. @@ -53,13 +55,8 @@ public static AddressTerminatedTopic Get(ActorSystem system) /// The actor that is registering for notifications. public void Subscribe(IActorRef subscriber) { - while (true) - { - var current = _subscribers; - if (!_subscribers.CompareAndSet(current, new HashSet(current.Value) {subscriber})) - continue; - break; - } + lock (_subscribers) + _subscribers.Add(subscriber); } /// @@ -68,15 +65,8 @@ public void Subscribe(IActorRef subscriber) /// The actor that is unregistering for notifications. public void Unsubscribe(IActorRef subscriber) { - while (true) - { - var current = _subscribers; - var newSet = new HashSet(_subscribers.Value); - newSet.Remove(subscriber); - if (!_subscribers.CompareAndSet(current, newSet)) - continue; - break; - } + lock (_subscribers) + _subscribers.Remove(subscriber); } /// @@ -85,7 +75,11 @@ public void Unsubscribe(IActorRef subscriber) /// The message that is sent to all subscribers. public void Publish(AddressTerminated msg) { - foreach (var subscriber in _subscribers.Value) + List subscribers; + lock(_subscribers) + subscribers = _subscribers.ToList(); + + foreach (var subscriber in subscribers) { subscriber.Tell(msg, ActorRefs.NoSender); }