diff --git a/src/benchmark/PersistenceBenchmark/PerformanceActors.cs b/src/benchmark/PersistenceBenchmark/PerformanceActors.cs index 225e38ae1ee..5b046d7103f 100644 --- a/src/benchmark/PersistenceBenchmark/PerformanceActors.cs +++ b/src/benchmark/PersistenceBenchmark/PerformanceActors.cs @@ -68,29 +68,38 @@ public PerformanceTestActor(string persistenceId) public sealed override string PersistenceId { get; } - protected override bool ReceiveRecover(object message) => message.Match() - .With(s => state += s.Value) - .WasHandled; - - protected override bool ReceiveCommand(object message) => message.Match() - .With(store => + protected override bool ReceiveRecover(object message) + { + if (message is Stored s) { - Persist(new Stored(store.Value), s => - { - state += s.Value; - }); - }) - .With(_ => + state += s.Value; + return true; + } + return false; + } + + protected override bool ReceiveCommand(object message) + { + switch (message) { - var sender = Sender; - Persist(new Stored(0), s => - { - state += s.Value; - sender.Tell(Done.Instance); - }); - }) - .With(_ => Sender.Tell(new Finished(state))) - .WasHandled; + case Store store: + Persist(new Stored(store.Value), s => { state += s.Value; }); + return true; + case Init _: + var sender = Sender; + Persist(new Stored(0), s => + { + state += s.Value; + sender.Tell(Done.Instance); + }); + return true; + case Finish _: + Sender.Tell(new Finished(state)); + return true; + default: + return false; + } + } } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerSpec.cs index 15e00f69c01..aa746397d6a 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerSpec.cs @@ -150,30 +150,34 @@ public PointToPointChannel() private void Idle(object message) { - message.Match() - .With(_ => - { + switch (message) + { + case RegisterConsumer _: _log.Info("Register consumer [{0}]", Sender.Path); Sender.Tell(RegistrationOk.Instance); Context.Become(Active(Sender)); - }) - .With(_ => - { + break; + case UnregisterConsumer _: _log.Info("Unexpected unregistration: [{0}]", Sender.Path); Sender.Tell(UnexpectedRegistration.Instance); Context.Stop(Self); - }) - .With(_ => Sender.Tell(ResetOk.Instance)) - .Default(msg => { }); + break; + case Reset _: + Sender.Tell(ResetOk.Instance); + break; + default: + // no-op + break; + } } private UntypedReceive Active(IActorRef consumer) { return message => { - message.Match() - .With(_ => - { + switch (message) + { + case UnregisterConsumer _: if (Sender.Equals(consumer)) { _log.Info("UnregistrationOk: [{0}]", Sender.Path); @@ -186,19 +190,23 @@ private UntypedReceive Active(IActorRef consumer) Sender.Tell(UnexpectedUnregistration.Instance); Context.Stop(Self); } - }) - .With(_ => - { + break; + + case RegisterConsumer _: _log.Info("Unexpected RegisterConsumer: [{0}], active consumer: [{1}]", Sender.Path, consumer.Path); Sender.Tell(UnexpectedRegistration.Instance); Context.Stop(Self); - }) - .With(_ => - { + break; + + case Reset _: Context.Become(Idle); Sender.Tell(ResetOk.Instance); - }) - .Default(msg => consumer.Tell(msg)); + break; + + default: + consumer.Tell(message); + break; + } }; } diff --git a/src/contrib/cluster/Akka.DistributedData/ReadAggregator.cs b/src/contrib/cluster/Akka.DistributedData/ReadAggregator.cs index 0948236a4ba..93a04d93eea 100644 --- a/src/contrib/cluster/Akka.DistributedData/ReadAggregator.cs +++ b/src/contrib/cluster/Akka.DistributedData/ReadAggregator.cs @@ -81,26 +81,36 @@ protected override void PreStart() Reply(false); } - protected override bool Receive(object message) => message.Match() - .With(x => - { - if (x.Envelope != null) - { - _result = _result?.Merge(x.Envelope) ?? x.Envelope; - } - - Remaining = Remaining.Remove(Sender.Path.Address); - var done = DoneWhenRemainingSize; - Log.Debug("read acks remaining: {0}, done when: {1}, current state: {2}", Remaining.Count, done, _result); - if (Remaining.Count == done) Reply(true); - }) - .With(x => + protected override bool Receive(object message) + { + switch (message) { - foreach (var n in SecondaryNodes) - Replica(n).Tell(_read); - }) - .With(_ => Reply(false)) - .WasHandled; + case ReadResult x: + if (x.Envelope != null) + { + _result = _result?.Merge(x.Envelope) ?? x.Envelope; + } + + Remaining = Remaining.Remove(Sender.Path.Address); + var done = DoneWhenRemainingSize; + Log.Debug("read acks remaining: {0}, done when: {1}, current state: {2}", Remaining.Count, done, + _result); + if (Remaining.Count == done) Reply(true); + return true; + + case SendToSecondary x: + foreach (var n in SecondaryNodes) + Replica(n).Tell(_read); + return true; + + case ReceiveTimeout _: + Reply(false); + return true; + + default: + return false; + } + } private void Reply(bool ok) { @@ -121,19 +131,30 @@ private void Reply(bool ok) } } - private Receive WaitRepairAck(DataEnvelope envelope) => msg => msg.Match() - .With(x => + private Receive WaitRepairAck(DataEnvelope envelope) => msg => + { + switch (msg) { - var reply = envelope.Data is DeletedData - ? (object)new DataDeleted(_key, null) - : new GetSuccess(_key, _req, envelope.Data); - _replyTo.Tell(reply, Context.Parent); - Context.Stop(Self); - }) - .With(x => Remaining = Remaining.Remove(Sender.Path.Address)) - .With(_ => { }) - .With(_ => { }) - .WasHandled; + case ReadRepairAck _: + var reply = envelope.Data is DeletedData + ? (object)new DataDeleted(_key, null) + : new GetSuccess(_key, _req, envelope.Data); + _replyTo.Tell(reply, Context.Parent); + Context.Stop(Self); + return true; + case ReadResult _: + Remaining = Remaining.Remove(Sender.Path.Address); + return true; + case SendToSecondary _: + // no-op + return true; + case ReceiveTimeout _: + // no-op + return true; + default: + return false; + } + }; } public interface IReadConsistency diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs index 35cbe4417b0..af777590f3c 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs @@ -73,27 +73,41 @@ protected override bool Receive(object message) protected bool Init(object message) { - return message.Match() - .With(() => { }) - .With(_ => ReceiveInitialRequest()) - .With(_ => Context.Stop(Self)) - .WasHandled; + switch (message) + { + case EventsByPersistenceIdPublisher.Continue _: + // no-op + return true; + case Request _: + ReceiveInitialRequest(); + return true; + case Cancel _: + Context.Stop(Self); + return true; + default: + return false; + } } protected bool Idle(object message) { - return message.Match() - .With(() => - { + switch (message) + { + case EventsByPersistenceIdPublisher.Continue _: if (IsTimeForReplay) Replay(); - }) - .With(() => - { + return true; + case EventAppended _: if (IsTimeForReplay) Replay(); - }) - .With(_ => ReceiveIdleRequest()) - .With(_ => Context.Stop(Self)) - .WasHandled; + return true; + case Request _: + ReceiveIdleRequest(); + return true; + case Cancel _: + Context.Stop(Self); + return true; + default: + return false; + } } protected void Replay() @@ -106,35 +120,55 @@ protected void Replay() protected Receive Replaying(int limit) { - return message => message.Match() - .With(replayed => - { - var seqNr = replayed.Persistent.SequenceNr; - Buffer.Add(new EventEnvelope( - offset: new Sequence(seqNr), - persistenceId: PersistenceId, - sequenceNr: seqNr, - @event: replayed.Persistent.Payload, - timestamp: replayed.Persistent.Timestamp)); - CurrentSequenceNr = seqNr + 1; - Buffer.DeliverBuffer(TotalDemand); - }) - .With(success => - { - Log.Debug("replay completed for persistenceId [{0}], currSeqNo [{1}]", PersistenceId, CurrentSequenceNr); - ReceiveRecoverySuccess(success.HighestSequenceNr); - }) - .With(failure => + return message => + { + switch (message) { - Log.Debug("replay failed for persistenceId [{0}], due to [{1}]", PersistenceId, failure.Cause.Message); - Buffer.DeliverBuffer(TotalDemand); - OnErrorThenStop(failure.Cause); - }) - .With(_ => Buffer.DeliverBuffer(TotalDemand)) - .With(() => { }) // skip during replay - .With(() => { }) // skip during replay - .With(_ => Context.Stop(Self)) - .WasHandled; + case ReplayedMessage replayed: + var seqNr = replayed.Persistent.SequenceNr; + Buffer.Add(new EventEnvelope( + offset: new Sequence(seqNr), + persistenceId: PersistenceId, + sequenceNr: seqNr, + @event: replayed.Persistent.Payload, + timestamp: replayed.Persistent.Timestamp)); + CurrentSequenceNr = seqNr + 1; + Buffer.DeliverBuffer(TotalDemand); + return true; + + case RecoverySuccess success: + Log.Debug("replay completed for persistenceId [{0}], currSeqNo [{1}]", PersistenceId, + CurrentSequenceNr); + ReceiveRecoverySuccess(success.HighestSequenceNr); + return true; + + case ReplayMessagesFailure failure: + Log.Debug("replay failed for persistenceId [{0}], due to [{1}]", PersistenceId, + failure.Cause.Message); + Buffer.DeliverBuffer(TotalDemand); + OnErrorThenStop(failure.Cause); + return true; + + case Request _: + Buffer.DeliverBuffer(TotalDemand); + return true; + + case EventsByPersistenceIdPublisher.Continue _: + // skip during replay + return true; + + case EventAppended _: + // skip during replay + return true; + + case Cancel _: + Context.Stop(Self); + return true; + + default: + return false; + } + }; } } diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs index 558dc024697..13eda725a5d 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs @@ -63,24 +63,44 @@ protected AbstractEventsByTagPublisher(string tag, long fromOffset, int maxBuffe protected abstract void ReceiveIdleRequest(); protected abstract void ReceiveRecoverySuccess(long highestSequenceNr); - protected override bool Receive(object message) => message.Match() - .With(_ => ReceiveInitialRequest()) - .With(() => { }) - .With(_ => Context.Stop(Self)) - .WasHandled; - - protected bool Idle(object message) => message.Match() - .With(() => + protected override bool Receive(object message) + { + switch (message) { - if (IsTimeForReplay) Replay(); - }) - .With(() => + case Request _: + ReceiveInitialRequest(); + return true; + case EventsByTagPublisher.Continue _: + // no-op + return true; + case Cancel _: + Context.Stop(Self); + return true; + default: + return false; + } + } + + protected bool Idle(object message) + { + switch (message) { - if (IsTimeForReplay) Replay(); - }) - .With(ReceiveIdleRequest) - .With(() => Context.Stop(Self)) - .WasHandled; + case EventsByTagPublisher.Continue _: + if (IsTimeForReplay) Replay(); + return true; + case TaggedEventAppended _: + if (IsTimeForReplay) Replay(); + return true; + case Request _: + ReceiveIdleRequest(); + return true; + case Cancel _: + Context.Stop(Self); + return true; + default: + return false; + } + } protected void Replay() { @@ -92,35 +112,53 @@ protected void Replay() protected Receive Replaying(int limit) { - return message => message.Match() - .With(replayed => - { - Buffer.Add(new EventEnvelope( - offset: new Sequence(replayed.Offset), - persistenceId: replayed.Persistent.PersistenceId, - sequenceNr: replayed.Persistent.SequenceNr, - @event: replayed.Persistent.Payload, - timestamp: replayed.Persistent.Timestamp)); - - CurrentOffset = replayed.Offset; - Buffer.DeliverBuffer(TotalDemand); - }) - .With(success => - { - Log.Debug("replay completed for tag [{0}], currOffset [{1}]", Tag, CurrentOffset); - ReceiveRecoverySuccess(success.HighestSequenceNr); - }) - .With(failure => + return message => + { + switch (message) { - Log.Debug("replay failed for tag [{0}], due to [{1}]", Tag, failure.Cause.Message); - Buffer.DeliverBuffer(TotalDemand); - OnErrorThenStop(failure.Cause); - }) - .With(_ => Buffer.DeliverBuffer(TotalDemand)) - .With(() => { }) - .With(() => { }) - .With(() => Context.Stop(Self)) - .WasHandled; + case ReplayedTaggedMessage replayed: + Buffer.Add(new EventEnvelope( + offset: new Sequence(replayed.Offset), + persistenceId: replayed.Persistent.PersistenceId, + sequenceNr: replayed.Persistent.SequenceNr, + @event: replayed.Persistent.Payload, + timestamp: replayed.Persistent.Timestamp)); + + CurrentOffset = replayed.Offset; + Buffer.DeliverBuffer(TotalDemand); + return true; + + case RecoverySuccess success: + Log.Debug("replay completed for tag [{0}], currOffset [{1}]", Tag, CurrentOffset); + ReceiveRecoverySuccess(success.HighestSequenceNr); + return true; + + case ReplayMessagesFailure failure: + Log.Debug("replay failed for tag [{0}], due to [{1}]", Tag, failure.Cause.Message); + Buffer.DeliverBuffer(TotalDemand); + OnErrorThenStop(failure.Cause); + return true; + + case Request _: + Buffer.DeliverBuffer(TotalDemand); + return true; + + case EventsByTagPublisher.Continue _: + // no-op + return true; + + case TaggedEventAppended _: + // no-op + return true; + + case Cancel _: + Context.Stop(Self); + return true; + + default: + return false; + } + }; } } diff --git a/src/contrib/persistence/Akka.Persistence.Sql.TestKit/SqlSnapshotConnectionFailureSpec.cs b/src/contrib/persistence/Akka.Persistence.Sql.TestKit/SqlSnapshotConnectionFailureSpec.cs index 1412a385def..6f78cf3cc6c 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.TestKit/SqlSnapshotConnectionFailureSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.TestKit/SqlSnapshotConnectionFailureSpec.cs @@ -49,20 +49,38 @@ public SaveSnapshotTestActor(string name, IActorRef probe) protected override bool ReceiveRecover(object message) { - return message.Match() - .With(offer => _state = offer.Snapshot as LinkedList) - .With(m => _state.AddFirst(m + "-" + LastSequenceNr)) - .WasHandled; + switch (message) + { + case SnapshotOffer offer: + _state = (LinkedList)offer.Snapshot; + return true; + case string m: + _state.AddFirst(m + "-" + LastSequenceNr); + return true; + default: + return false; + } } protected override bool ReceiveCommand(object message) { - return message.Match() - .With(payload => Persist(payload, _ => _state.AddFirst(payload + "-" + LastSequenceNr))) - .With(_ => SaveSnapshot(_state)) - .With(s => _probe.Tell(s.Metadata.SequenceNr)) - .With(_ => _probe.Tell(_state.Reverse().ToArray())) - .WasHandled; + switch (message) + { + case string payload: + Persist(payload, _ => _state.AddFirst(payload + "-" + LastSequenceNr)); + return true; + case TakeSnapshot _: + SaveSnapshot(_state); + return true; + case SaveSnapshotSuccess s: + _probe.Tell(s.Metadata.SequenceNr); + return true; + case GetState _: + _probe.Tell(_state.Reverse().ToArray()); + return true; + default: + return false; + } } } diff --git a/src/contrib/persistence/Akka.Persistence.Sql.TestKit/TestActor.cs b/src/contrib/persistence/Akka.Persistence.Sql.TestKit/TestActor.cs index 9cb4847dc36..68f8608eaf0 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.TestKit/TestActor.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.TestKit/TestActor.cs @@ -34,21 +34,24 @@ public TestActor(string persistenceId) protected override bool ReceiveRecover(object message) => true; private IActorRef _parentTestActor; - protected override bool ReceiveCommand(object message) => message.Match() - .With(delete => - { - _parentTestActor = Sender; - DeleteMessages(delete.ToSequenceNr); - }) - .With(deleteSuccess => - { - _parentTestActor.Tell(deleteSuccess.ToSequenceNr.ToString() + "-deleted"); - }) - .With(cmd => + protected override bool ReceiveCommand(object message) + { + switch (message) { - var sender = Sender; - Persist(cmd, e => sender.Tell(e + "-done")); - }) - .WasHandled; + case DeleteCommand delete: + _parentTestActor = Sender; + DeleteMessages(delete.ToSequenceNr); + return true; + case DeleteMessagesSuccess deleteSuccess: + _parentTestActor.Tell(deleteSuccess.ToSequenceNr.ToString() + "-deleted"); + return true; + case string cmd: + var sender = Sender; + Persist(cmd, e => sender.Tell(e + "-done")); + return true; + default: + return false; + } + } } } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt index e238dd63acb..ec956617de5 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt @@ -2153,48 +2153,6 @@ namespace Akka.Annotations public InternalApiAttribute() { } } } -namespace Akka -{ - public class Case : Akka.IMatchResult - { - public Case(object message) { } - public bool WasHandled { get; } - public Akka.IMatchResult Default(System.Action action) { } - public Akka.Case With(System.Action action) { } - public Akka.Case With(System.Action action) { } - } - public class Case : Akka.IMatchResult - { - public Case(object message) { } - public bool WasHandled { get; } - public T ResultOrDefault(System.Func function) { } - public Akka.Case With(System.Func function) { } - public Akka.Case With(System.Func function) { } - } - public sealed class Done - { - public static readonly Akka.Done Instance; - } - public interface IMatchResult - { - bool WasHandled { get; } - } - public sealed class NotUsed : System.IComparable, System.IEquatable - { - public static readonly Akka.NotUsed Instance; - public int CompareTo(Akka.NotUsed other) { } - public override bool Equals(object obj) { } - public bool Equals(Akka.NotUsed other) { } - public override int GetHashCode() { } - public override string ToString() { } - } - [System.ObsoleteAttribute("Use instead the pattern matching feature introduced in C# 7.0")] - public class static PatternMatch - { - public static Akka.Case Match(this object target) { } - public static Akka.Case Match(this object target) { } - } -} namespace Akka.Configuration { public class Config @@ -2961,6 +2919,22 @@ namespace Akka.Dispatch.SysMsg public override string ToString() { } } } +namespace Akka +{ + public sealed class Done + { + public static readonly Akka.Done Instance; + } + public sealed class NotUsed : System.IComparable, System.IEquatable + { + public static readonly Akka.NotUsed Instance; + public int CompareTo(Akka.NotUsed other) { } + public override bool Equals(object obj) { } + public bool Equals(Akka.NotUsed other) { } + public override int GetHashCode() { } + public override string ToString() { } + } +} namespace Akka.Event { public abstract class ActorEventBus : Akka.Event.EventBus diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index c933b728e80..0b94752a7df 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -2155,48 +2155,6 @@ namespace Akka.Annotations public InternalApiAttribute() { } } } -namespace Akka -{ - public class Case : Akka.IMatchResult - { - public Case(object message) { } - public bool WasHandled { get; } - public Akka.IMatchResult Default(System.Action action) { } - public Akka.Case With(System.Action action) { } - public Akka.Case With(System.Action action) { } - } - public class Case : Akka.IMatchResult - { - public Case(object message) { } - public bool WasHandled { get; } - public T ResultOrDefault(System.Func function) { } - public Akka.Case With(System.Func function) { } - public Akka.Case With(System.Func function) { } - } - public sealed class Done - { - public static readonly Akka.Done Instance; - } - public interface IMatchResult - { - bool WasHandled { get; } - } - public sealed class NotUsed : System.IComparable, System.IEquatable - { - public static readonly Akka.NotUsed Instance; - public int CompareTo(Akka.NotUsed other) { } - public override bool Equals(object obj) { } - public bool Equals(Akka.NotUsed other) { } - public override int GetHashCode() { } - public override string ToString() { } - } - [System.ObsoleteAttribute("Use instead the pattern matching feature introduced in C# 7.0")] - public class static PatternMatch - { - public static Akka.Case Match(this object target) { } - public static Akka.Case Match(this object target) { } - } -} namespace Akka.Configuration { public class Config @@ -2966,6 +2924,22 @@ namespace Akka.Dispatch.SysMsg public override string ToString() { } } } +namespace Akka +{ + public sealed class Done + { + public static readonly Akka.Done Instance; + } + public sealed class NotUsed : System.IComparable, System.IEquatable + { + public static readonly Akka.NotUsed Instance; + public int CompareTo(Akka.NotUsed other) { } + public override bool Equals(object obj) { } + public bool Equals(Akka.NotUsed other) { } + public override int GetHashCode() { } + public override string ToString() { } + } +} namespace Akka.Event { public abstract class ActorEventBus : Akka.Event.EventBus diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index e238dd63acb..ec956617de5 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -2153,48 +2153,6 @@ namespace Akka.Annotations public InternalApiAttribute() { } } } -namespace Akka -{ - public class Case : Akka.IMatchResult - { - public Case(object message) { } - public bool WasHandled { get; } - public Akka.IMatchResult Default(System.Action action) { } - public Akka.Case With(System.Action action) { } - public Akka.Case With(System.Action action) { } - } - public class Case : Akka.IMatchResult - { - public Case(object message) { } - public bool WasHandled { get; } - public T ResultOrDefault(System.Func function) { } - public Akka.Case With(System.Func function) { } - public Akka.Case With(System.Func function) { } - } - public sealed class Done - { - public static readonly Akka.Done Instance; - } - public interface IMatchResult - { - bool WasHandled { get; } - } - public sealed class NotUsed : System.IComparable, System.IEquatable - { - public static readonly Akka.NotUsed Instance; - public int CompareTo(Akka.NotUsed other) { } - public override bool Equals(object obj) { } - public bool Equals(Akka.NotUsed other) { } - public override int GetHashCode() { } - public override string ToString() { } - } - [System.ObsoleteAttribute("Use instead the pattern matching feature introduced in C# 7.0")] - public class static PatternMatch - { - public static Akka.Case Match(this object target) { } - public static Akka.Case Match(this object target) { } - } -} namespace Akka.Configuration { public class Config @@ -2961,6 +2919,22 @@ namespace Akka.Dispatch.SysMsg public override string ToString() { } } } +namespace Akka +{ + public sealed class Done + { + public static readonly Akka.Done Instance; + } + public sealed class NotUsed : System.IComparable, System.IEquatable + { + public static readonly Akka.NotUsed Instance; + public int CompareTo(Akka.NotUsed other) { } + public override bool Equals(object obj) { } + public bool Equals(Akka.NotUsed other) { } + public override int GetHashCode() { } + public override string ToString() { } + } +} namespace Akka.Event { public abstract class ActorEventBus : Akka.Event.EventBus diff --git a/src/core/Akka.Cluster.Tests.MultiNode/NodeLeavingAndExitingSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/NodeLeavingAndExitingSpec.cs index 48eb4d27172..9d5a78606df 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/NodeLeavingAndExitingSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/NodeLeavingAndExitingSpec.cs @@ -46,25 +46,24 @@ public Listener(TestLatch exitingLatch, Address secondAddress) protected override void OnReceive(object message) { - message.Match() - .With(state => - { + switch (message) + { + case ClusterEvent.CurrentClusterState state: if (state.Members.Any(c => c.Address.Equals(_secondAddress) && c.Status == MemberStatus.Exiting)) { _exitingLatch.CountDown(); } - }) - .With(m => - { + break; + case ClusterEvent.MemberExited m: if (m.Member.Address.Equals(_secondAddress)) { _exitingLatch.CountDown(); } - }) - .With(_ => - { + break; + case ClusterEvent.MemberRemoved _: // not tested here - }); + break; + } } } diff --git a/src/core/Akka.Cluster.Tests.MultiNode/NodeUpSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/NodeUpSpec.cs index b36ba06b060..ee13eafad83 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/NodeUpSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/NodeUpSpec.cs @@ -46,15 +46,15 @@ public Listener(AtomicReference> unexpected) protected override void OnReceive(object message) { - message.Match() - .With(evt => - { + switch (message) + { + case ClusterEvent.IMemberEvent evt: _unexpected.Value.Add(evt.Member); - }) - .With(() => - { + break; + case ClusterEvent.CurrentClusterState _: // ignore - }); + break; + } } } diff --git a/src/core/Akka.Cluster.Tests.MultiNode/SunnyWeatherSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/SunnyWeatherSpec.cs index 638eff750e7..47acfd532c4 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/SunnyWeatherSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/SunnyWeatherSpec.cs @@ -61,15 +61,15 @@ public Listener(AtomicReference> unexpected) protected override void OnReceive(object message) { - message.Match() - .With(evt => - { + switch (message) + { + case ClusterEvent.IMemberEvent evt: _unexpected.Value.Add(evt.Member); - }) - .With(() => - { + break; + case ClusterEvent.CurrentClusterState _: // ignore - }); + break; + } } } diff --git a/src/core/Akka.Cluster/ClusterReadView.cs b/src/core/Akka.Cluster/ClusterReadView.cs index f72b0ee7e05..1f943f1cf4f 100644 --- a/src/core/Akka.Cluster/ClusterReadView.cs +++ b/src/core/Akka.Cluster/ClusterReadView.cs @@ -107,31 +107,31 @@ public EventBusListener(Cluster cluster, ClusterReadView readView) Receive(clusterDomainEvent => { - clusterDomainEvent.Match() - .With(changed => - { + switch (clusterDomainEvent) + { + case ClusterEvent.SeenChanged changed: State = State.Copy(seenBy: changed.SeenBy); - }) - .With(changed => - { + break; + + case ClusterEvent.ReachabilityChanged changed: _readView._reachability = changed.Reachability; - }) - .With(removed => - { + break; + + case ClusterEvent.MemberRemoved removed: State = State.Copy(members: State.Members.Remove(removed.Member), unreachable: State.Unreachable.Remove(removed.Member)); - }) - .With(member => - { + break; + + case ClusterEvent.UnreachableMember member: // replace current member with new member (might have different status, only address is used in == comparison) State = State.Copy(unreachable: State.Unreachable.Remove(member.Member).Add(member.Member)); - }) - .With(member => - { + break; + + case ClusterEvent.ReachableMember member: State = State.Copy(unreachable: State.Unreachable.Remove(member.Member)); - }) - .With(memberEvent => - { + break; + + case ClusterEvent.IMemberEvent memberEvent: var newUnreachable = State.Unreachable; // replace current member with new member (might have different status, only address is used in == comparison) if (State.Unreachable.Contains(memberEvent.Member)) @@ -139,20 +139,24 @@ public EventBusListener(Cluster cluster, ClusterReadView readView) State = State.Copy( members: State.Members.Remove(memberEvent.Member).Add(memberEvent.Member), unreachable: newUnreachable); - }) - .With(changed => - { + break; + + case ClusterEvent.LeaderChanged changed: State = State.Copy(leader: changed.Leader); - }) - .With(changed => - { + break; + + case ClusterEvent.RoleLeaderChanged changed: State = State.Copy(roleLeaderMap: State.RoleLeaderMap.SetItem(changed.Role, changed.Leader)); - }) - .With(stats => - { + break; + + case ClusterEvent.CurrentInternalStats stats: readView._latestStats = stats; - }) - .With(_ => { }); + break; + + case ClusterEvent.ClusterShuttingDown _: + // no-op + break; + } // once captured, optional verbose logging of event if (!(clusterDomainEvent is ClusterEvent.SeenChanged) && _cluster.Settings.LogInfoVerbose) diff --git a/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryFailureSpec.cs b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryFailureSpec.cs index bdfe249495c..fb37e5b09f3 100644 --- a/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryFailureSpec.cs +++ b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryFailureSpec.cs @@ -155,21 +155,23 @@ public ChaosSender(IActorRef destination, IActorRef probe) protected override bool ReceiveRecover(object message) { - return message.Match() - .With(evt => - { - UpdateState(evt); - if (ChaosSupportExtensions.ShouldFail(_replayProcessingFailureRate)) - throw new TestException(DebugMessage(string.Format("replay failed at event {0}", evt))); - Log.Debug(DebugMessage(string.Format("replayed event {0}", evt))); - }).WasHandled; + if (message is IEvt evt) + { + UpdateState(evt); + if (ChaosSupportExtensions.ShouldFail(_replayProcessingFailureRate)) + throw new TestException(DebugMessage(string.Format("replay failed at event {0}", evt))); + Log.Debug(DebugMessage(string.Format("replayed event {0}", evt))); + return true; + } + + return false; } protected override bool ReceiveCommand(object message) { - return message.Match() - .With(i => - { + switch (message) + { + case int i: if (State.Contains(i)) { Log.Debug(DebugMessage("ignored duplicate")); @@ -180,15 +182,19 @@ protected override bool ReceiveCommand(object message) { UpdateState(sent); if (ChaosSupportExtensions.ShouldFail(_liveProcessingFailureRate)) - throw new TestException(DebugMessage(string.Format("failed at payload {0}", sent.I))); - Log.Debug(DebugMessage(String.Format("processed payload {0}", sent.I))); + throw new TestException(DebugMessage($"failed at payload {sent.I}")); + Log.Debug(DebugMessage($"processed payload {sent.I}")); }); - }) - .With(confirm => - { + + return true; + + case Confirm confirm: Persist(new MsgConfirmed(confirm.DeliveryId, confirm.I), x => UpdateState(x)); - }) - .WasHandled; + return true; + + default: + return false; + } } private void UpdateState(IEvt evt) diff --git a/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryReceiveActorSpec.cs b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryReceiveActorSpec.cs index a6b7d5b38bc..2dfd99c6c00 100644 --- a/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryReceiveActorSpec.cs +++ b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliveryReceiveActorSpec.cs @@ -232,18 +232,18 @@ public override string PersistenceId private void UpdateState(IEvt evt) { - evt.Match() - .With(a => - { + switch (evt) + { + case AcceptedReq a: _log.Debug("Deliver(destination, deliveryId => Action(deliveryId, {0})), recovering: {1}", a.Payload, IsRecovering); Deliver(ActorPath.Parse(a.DestinationPath), deliveryId => new Action(deliveryId, a.Payload)); - }) - .With(r => - { + break; + case ReqDone r: _log.Debug("ConfirmDelivery({0}), recovering: {1}", r.Id, IsRecovering); ConfirmDelivery(r.Id); - }); + break; + } } } diff --git a/src/core/Akka.Persistence.Tests/AtLeastOnceDeliverySpec.cs b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliverySpec.cs index cb90a6e6c6b..92ebace9fdd 100644 --- a/src/core/Akka.Persistence.Tests/AtLeastOnceDeliverySpec.cs +++ b/src/core/Akka.Persistence.Tests/AtLeastOnceDeliverySpec.cs @@ -47,39 +47,49 @@ public Sender(IActorRef testActor, string name, TimeSpan redeliverInterval, int protected override bool ReceiveRecover(object message) { - return message.Match() - .With(UpdateState) - .With(o => - { + switch (message) + { + case IEvt e: + UpdateState(e); + return true; + case SnapshotOffer o: var snap = (Snap)o.Snapshot; SetDeliverySnapshot(snap.DeliverySnapshot); - }) - .WasHandled; + return true; + default: + return false; + } } protected override bool ReceiveCommand(object message) { - return message.Match() - .With(req => + switch (message) + { + case Req req: { if (string.IsNullOrEmpty(req.Payload)) Sender.Tell(InvalidReq.Instance); else { var c = char.ToUpper(req.Payload[0]); var destination = _destinations[c.ToString()]; - if (_isAsync) PersistAsync(new AcceptedReq(req.Payload, destination.ToString()), e => - { - UpdateState(e); - Sender.Tell(ReqAck.Instance); - }); - else Persist(new AcceptedReq(req.Payload, destination.ToString()), e => - { - UpdateState(e); - Sender.Tell(ReqAck.Instance); - }); + if (_isAsync) + PersistAsync(new AcceptedReq(req.Payload, destination.ToString()), e => + { + UpdateState(e); + Sender.Tell(ReqAck.Instance); + }); + else + Persist(new AcceptedReq(req.Payload, destination.ToString()), e => + { + UpdateState(e); + Sender.Tell(ReqAck.Instance); + }); } - }) - .With(msg => + + return true; + } + + case ReqSelection msg: { var c = char.ToUpper(msg.Payload[0]); var destination = _destinations[c.ToString()]; @@ -88,59 +98,62 @@ protected override bool ReceiveCommand(object message) UpdateState(e); Sender.Tell(ReqAck.Instance); }); - }) - .With(ack => - { + return true; + } + + case ActionAck ack: _log.Debug("Sender got ack: {0}", ack.Id); if (ConfirmDelivery(ack.Id)) { - if (_isAsync) PersistAsync(new ReqDone(ack.Id), done => UpdateState(done)); - else Persist(new ReqDone(ack.Id), done => UpdateState(done)); + if (_isAsync) PersistAsync(new ReqDone(ack.Id), UpdateState); + else Persist(new ReqDone(ack.Id), UpdateState); } - }) - .With(boom => - { + + return true; + + case Boom _: _log.Debug("Boom!"); throw new Exception("boom"); - }) - .With(save => - { + + case SaveSnap _: _log.Debug("Save snapshot"); _lastSnapshotAskedForBy = Sender; SaveSnapshot(new Snap(GetDeliverySnapshot())); - }) - .With(succ => - { + return true; + + case SaveSnapshotSuccess succ: _log.Debug("Save snapshot success!"); if (_lastSnapshotAskedForBy != null) _lastSnapshotAskedForBy.Tell(succ); - }) - .With(warn => - { + return true; + + case UnconfirmedWarning warn: _log.Debug("Sender got unconfirmed warning: unconfirmed deliveries count {0}", warn.UnconfirmedDeliveries.Count()); _testActor.Tell(warn); - }) - .WasHandled; + return true; + + default: + return false; + } } private void UpdateState(IEvt evt) { - evt.Match() - .With(a => - { + switch (evt) + { + case AcceptedReq a: _log.Debug("Deliver(destination, deliveryId => Action(deliveryId, {0})), recovering: {1}", a.Payload, IsRecovering); Deliver(ActorPath.Parse(a.DestinationPath), deliveryId => new Action(deliveryId, a.Payload)); - }) - .With(a => - { + break; + case AcceptedSelectionReq a: _log.Debug("Deliver(destination, deliveryId => Action(deliveryId, {0})), recovering: {1}", a.Payload, IsRecovering); Deliver(Context.System.ActorSelection(a.DestinationPath), deliveryId => new Action(deliveryId, a.Payload)); - }) - .With(r => - { + break; + case ReqDone r: _log.Debug("ConfirmDelivery({0}), recovering: {1}", r.Id, IsRecovering); ConfirmDelivery(r.Id); - }); + break; + } } } diff --git a/src/core/Akka.Persistence.Tests/SnapshotSpec.cs b/src/core/Akka.Persistence.Tests/SnapshotSpec.cs index 43da8c0531d..96edbeee344 100644 --- a/src/core/Akka.Persistence.Tests/SnapshotSpec.cs +++ b/src/core/Akka.Persistence.Tests/SnapshotSpec.cs @@ -43,26 +43,43 @@ public SaveSnapshotTestActor(string name, IActorRef probe) protected override bool ReceiveRecover(object message) { - return message.Match() - .With(offer => - { + switch (message) + { + case SnapshotOffer offer: State = offer.Snapshot.AsInstanceOf>(); - }) - .With(m => State = State.AddFirst(m + "-" + LastSequenceNr)) - .WasHandled; + return true; + + case string m: + State = State.AddFirst(m + "-" + LastSequenceNr); + return true; + + default: + return false; + } } protected override bool ReceiveCommand(object message) { - return message.Match() - .With(payload => Persist(payload, _ => - { - State = State.AddFirst(payload + "-" + LastSequenceNr); - })) - .With(_ => SaveSnapshot(State)) - .With(s => _probe.Tell(s.Metadata.SequenceNr)) - .With(_ => _probe.Tell(State.Reverse().ToArray())) - .WasHandled; + switch (message) + { + case string payload: + Persist(payload, _ => + { + State = State.AddFirst(payload + "-" + LastSequenceNr); + }); + return true; + case TakeSnapshot _: + SaveSnapshot(State); + return true; + case SaveSnapshotSuccess s: + _probe.Tell(s.Metadata.SequenceNr); + return true; + case GetState _: + _probe.Tell(State.Reverse().ToArray()); + return true; + default: + return false; + } } } @@ -80,26 +97,37 @@ public LoadSnapshotTestActor(string name, Recovery recovery, IActorRef probe) protected override bool ReceiveRecover(object message) { - return message.Match() - .With(payload => _probe.Tell(payload + "-" + LastSequenceNr)) - .With(offer => _probe.Tell(offer)) - .Default(other => _probe.Tell(other)) - .WasHandled; + switch (message) + { + case string payload: + _probe.Tell(payload + "-" + LastSequenceNr); + return true; + case SnapshotOffer offer: + _probe.Tell(offer); + return true; + default: + _probe.Tell(message); + return true; + } } protected override bool ReceiveCommand(object message) { - return message.Match() - .With(payload => - { + switch (message) + { + case string payload: if (payload == "done") _probe.Tell("done"); else Persist(payload, _ => _probe.Tell(payload + "-" + LastSequenceNr)); - }) - .With(offer => _probe.Tell(offer)) - .Default(other => _probe.Tell(other)) - .WasHandled; + return true; + case SnapshotOffer offer: + _probe.Tell(offer); + return true; + default: + _probe.Tell(message); + return true; + } } protected override void PreStart() { } @@ -190,10 +218,17 @@ protected override bool ReceiveCommand(object message) protected bool ReceiveDelete(object message) { - return message.Match() - .With(d => DeleteSnapshot(d.Metadata.SequenceNr)) - .With(d => DeleteSnapshots(d.Criteria)) - .WasHandled; + switch (message) + { + case DeleteOne d: + DeleteSnapshot(d.Metadata.SequenceNr); + return true; + case DeleteMany d: + DeleteSnapshots(d.Criteria); + return true; + default: + return false; + } } } diff --git a/src/core/Akka.Remote.TestKit/BarrierCoordinator.cs b/src/core/Akka.Remote.TestKit/BarrierCoordinator.cs index dda3f28a3c9..6e6d1dbff95 100644 --- a/src/core/Akka.Remote.TestKit/BarrierCoordinator.cs +++ b/src/core/Akka.Remote.TestKit/BarrierCoordinator.cs @@ -498,81 +498,71 @@ protected void InitFSM() WhenUnhandled(@event => { - State nextState = null; var clients = @event.StateData.Clients; var arrived = @event.StateData.Arrived; - @event.FsmEvent.Match() - .With(node => - { + switch (@event.FsmEvent) + { + case Controller.NodeInfo node: if (clients.Any(x => x.Name == node.Name)) throw new DuplicateNodeException(@event.StateData, node); - nextState = Stay().Using(@event.StateData.Copy(clients.Add(node))); - }) - .With(disconnected => - { + return Stay().Using(@event.StateData.Copy(clients.Add(node))); + + case Controller.ClientDisconnected disconnected: if (arrived == null || arrived.Count == 0) - nextState = - Stay() - .Using( - @event.StateData.Copy(clients.Where(x => x.Name != disconnected.Name).ToImmutableHashSet())); - else - { - var client = clients.FirstOrDefault(x => x.Name == disconnected.Name); - if (client == null) nextState = Stay(); - else - { - throw new ClientLostException(@event.StateData.Copy(clients.Remove(client), arrived:arrived.Where(x => x != client.FSM).ToImmutableHashSet()), disconnected.Name); - } - } - }); - - return nextState; + return Stay() + .Using(@event.StateData.Copy(clients.Where(x => x.Name != disconnected.Name).ToImmutableHashSet())); + + var client = clients.FirstOrDefault(x => x.Name == disconnected.Name); + if (client == null) + return Stay(); + + throw new ClientLostException(@event.StateData.Copy(clients.Remove(client), arrived:arrived.Where(x => x != client.FSM).ToImmutableHashSet()), disconnected.Name); + + default: + return null; + } }); When(State.Idle, @event => { - State nextState = null; var clients = @event.StateData.Clients; - @event.FsmEvent.Match() - .With(barrier => - { + switch (@event.FsmEvent) + { + case EnterBarrier barrier: if (_failed) - nextState = - Stay().Replying(new ToClient(new BarrierResult(barrier.Name, false))); - else if (clients.Select(x => x.FSM).SequenceEqual(new List() {Sender})) - nextState = - Stay().Replying(new ToClient(new BarrierResult(barrier.Name, true))); - else if (clients.All(x => !Equals(x.FSM, Sender))) - nextState = - Stay().Replying(new ToClient(new BarrierResult(barrier.Name, false))); - else - { - nextState = - GoTo(State.Waiting) - .Using(@event.StateData.Copy(barrier: barrier.Name, - arrived: ImmutableHashSet.Create(Sender), - deadline: GetDeadline(barrier.Timeout))); - } - }) - .With(client => - { + return Stay().Replying(new ToClient(new BarrierResult(barrier.Name, false))); + + if (clients.Select(x => x.FSM).SequenceEqual(new List() {Sender})) + return Stay().Replying(new ToClient(new BarrierResult(barrier.Name, true))); + + if (clients.All(x => !Equals(x.FSM, Sender))) + return Stay().Replying(new ToClient(new BarrierResult(barrier.Name, false))); + + return GoTo(State.Waiting) + .Using(@event.StateData.Copy( + barrier: barrier.Name, + arrived: ImmutableHashSet.Create(Sender), + deadline: GetDeadline(barrier.Timeout))); + + case RemoveClient client: if (clients.Count == 0) throw new BarrierEmptyException(@event.StateData, $"cannot remove {client.Name}: no client to remove"); - nextState = - Stay().Using(@event.StateData.Copy(clients.Where(x => x.Name != client.Name).ToImmutableHashSet())); - }); - - return nextState; + + return Stay().Using(@event.StateData.Copy(clients.Where(x => x.Name != client.Name).ToImmutableHashSet())); + + default: + return null; + } }); When(State.Waiting, @event => { - State nextState = null; var currentBarrier = @event.StateData.Barrier; var clients = @event.StateData.Clients; var arrived = @event.StateData.Arrived; - @event.FsmEvent.Match() - .With(barrier => - { + + switch (@event.FsmEvent) + { + case EnterBarrier barrier: if (barrier.Name != currentBarrier) throw new WrongBarrierException(barrier.Name, Sender, @event.StateData); var together = clients.Any(x => Equals(x.FSM, Sender)) @@ -583,35 +573,30 @@ protected void InitFSM() if (enterDeadline.TimeLeft < @event.StateData.Deadline.TimeLeft) { SetTimer("Timeout", StateTimeout.Instance, enterDeadline.TimeLeft, false); - nextState = HandleBarrier(@event.StateData.Copy(arrived: together, deadline: enterDeadline)); + return HandleBarrier(@event.StateData.Copy(arrived: together, deadline: enterDeadline)); } - else - { - nextState = HandleBarrier(@event.StateData.Copy(arrived: together)); - } - }) - .With(client => - { + + return HandleBarrier(@event.StateData.Copy(arrived: together)); + + case RemoveClient client: var removedClient = clients.FirstOrDefault(x => x.Name == client.Name); - if (removedClient == null) nextState = Stay(); - else - { - nextState = - HandleBarrier(@event.StateData.Copy(clients.Remove(removedClient), - arrived: arrived.Where(x => !Equals(x, removedClient.FSM)).ToImmutableHashSet())); - } - }) - .With(barrier => - { - if(barrier.Name != currentBarrier) throw new WrongBarrierException(barrier.Name, Sender, @event.StateData); + if (removedClient == null) + return Stay(); + + return HandleBarrier(@event.StateData.Copy(clients.Remove(removedClient), + arrived: arrived.Where(x => !Equals(x, removedClient.FSM)).ToImmutableHashSet())); + + case FailBarrier barrier: + if(barrier.Name != currentBarrier) + throw new WrongBarrierException(barrier.Name, Sender, @event.StateData); throw new FailedBarrierException(@event.StateData); - }) - .With(() => - { + + case StateTimeout _: throw new BarrierTimeoutException(@event.StateData); - }); - - return nextState; + + default: + return null; + } }); OnTransition((state, nextState) => diff --git a/src/core/Akka.Remote.Tests.MultiNode/RemoteReDeploymentSpec.cs b/src/core/Akka.Remote.Tests.MultiNode/RemoteReDeploymentSpec.cs index 8d3d70d83c5..3dc8cbba15f 100644 --- a/src/core/Akka.Remote.Tests.MultiNode/RemoteReDeploymentSpec.cs +++ b/src/core/Akka.Remote.Tests.MultiNode/RemoteReDeploymentSpec.cs @@ -168,8 +168,14 @@ public Parent() protected override bool Receive(object message) { - return message.Match().With(_ => Context.ActorOf(_.Props, _.Name)).Default(m => - _monitor.Tell(m)).WasHandled; + if (message is ParentMessage msg) + { + Context.ActorOf(msg.Props, msg.Name); + return true; + } + + _monitor.Tell(message); + return true; } } diff --git a/src/core/Akka.Remote/Endpoint.cs b/src/core/Akka.Remote/Endpoint.cs index 04ed46ea9e7..91f2f60ed82 100644 --- a/src/core/Akka.Remote/Endpoint.cs +++ b/src/core/Akka.Remote/Endpoint.cs @@ -1319,10 +1319,18 @@ private Deadline NewAckDeadline() private void PublishAndThrow(Exception reason, LogLevel level, bool needToThrow = true) { - reason.Match() - .With(endpoint => PublishDisassociated()) - .With(shutdown => { }) // don't log an error for planned shutdowns - .Default(msg => PublishError(reason, level)); + switch (reason) + { + case EndpointDisassociatedException _: + PublishDisassociated(); + break; + case ShutDownAssociation _: + // don't log an error for planned shutdowns + break; + default: + PublishError(reason, level); + break; + } if (needToThrow) { diff --git a/src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs b/src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs index c9a7b58160a..45548485f49 100644 --- a/src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs +++ b/src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs @@ -10,9 +10,7 @@ using System.Runtime.Serialization; using System.Threading.Tasks; using Akka.Actor; -using Akka.Actor.Internal; using Akka.Event; -using Akka.Remote.Serialization; using Akka.Util.Internal; using Google.Protobuf; @@ -1078,91 +1076,112 @@ AssociationHandle GetHandle(ProtocolStateData data) } }); - OnTermination(@event => @event.StateData.Match() - .With(ou => ou.StatusCompletionSource.TrySetException(@event.Reason is Failure - ? new AkkaProtocolException(@event.Reason.ToString()) - : new AkkaProtocolException("Transport disassociated before handshake finished"))) - .With(oua => - { - Exception associationFailure = null; - @event.Reason.Match() - .With(f => f.Cause.Match() - .With( - timeout => - associationFailure = - new AkkaProtocolException(timeout.ErrorMessage)) - .With( - forbidden => - associationFailure = - new AkkaProtocolException( - "The remote system has a UID that has been quarantined. Association aborted.")) - .With(info => associationFailure = DisassociateException(info))) - .Default( - msg => - associationFailure = - new AkkaProtocolException( - "Transport disassociated before handshake finished")); - - oua.StatusCompletionSource.TrySetException(associationFailure); - oua.WrappedHandle.Disassociate(DisassociationReason(@event.Reason), _log); - }) - .With(awh => - { - // Invalidate exposed but still unfinished promise. The underlying association disappeared, so after - // registration immediately signal a disassociate - Disassociated disassociateNotification = null; - if (@event.Reason is Failure && @event.Reason.AsInstanceOf().Cause is DisassociateInfo) - { - disassociateNotification = - new Disassociated(@event.Reason.AsInstanceOf().Cause.AsInstanceOf()); - } - else - { - disassociateNotification = new Disassociated(DisassociateInfo.Unknown); - } - awh.HandlerListener.ContinueWith(result => result.Result.Notify(disassociateNotification), - TaskContinuationOptions.ExecuteSynchronously); - awh.WrappedHandle.Disassociate(DisassociationReason(@event.Reason), _log); - }) - .With(lr => + OnTermination(@event => + { + switch (@event.StateData) { - Disassociated disassociateNotification = null; - if (@event.Reason is Failure failure && failure.Cause is DisassociateInfo) + case OutboundUnassociated ou: + ou.StatusCompletionSource.TrySetException(@event.Reason is Failure + ? new AkkaProtocolException(@event.Reason.ToString()) + : new AkkaProtocolException("Transport disassociated before handshake finished")); + break; + + case OutboundUnderlyingAssociated oua: + Exception associationFailure = null; + switch (@event.Reason) + { + case Failure f: + switch (f.Cause) + { + case TimeoutReason timeout: + associationFailure = new AkkaProtocolException(timeout.ErrorMessage); + break; + case ForbiddenUidReason _: + associationFailure = new AkkaProtocolException("The remote system has a UID that has been quarantined. Association aborted."); + break; + case DisassociateInfo info: + associationFailure = DisassociateException(info); + break; + default: + associationFailure = new AkkaProtocolException($"Unknown Failure cause: [{f.Cause}]"); + break; + } + break; + default: + associationFailure = new AkkaProtocolException("Transport disassociated before handshake finished"); + break; + } + oua.StatusCompletionSource.TrySetException(associationFailure); + oua.WrappedHandle.Disassociate(DisassociationReason(@event.Reason), _log); + break; + + case AssociatedWaitHandler awh: { - disassociateNotification = - new Disassociated(failure.Cause.AsInstanceOf()); + // Invalidate exposed but still unfinished promise. The underlying association disappeared, so after + // registration immediately signal a disassociate + Disassociated disassociateNotification; + if (@event.Reason is Failure && @event.Reason.AsInstanceOf().Cause is DisassociateInfo) + { + disassociateNotification = + new Disassociated(@event.Reason.AsInstanceOf().Cause + .AsInstanceOf()); + } + else + { + disassociateNotification = new Disassociated(DisassociateInfo.Unknown); + } + + awh.HandlerListener.ContinueWith(result => result.Result.Notify(disassociateNotification), + TaskContinuationOptions.ExecuteSynchronously); + awh.WrappedHandle.Disassociate(DisassociationReason(@event.Reason), _log); + break; } - else + + case ListenerReady lr: { - disassociateNotification = new Disassociated(DisassociateInfo.Unknown); + Disassociated disassociateNotification; + if (@event.Reason is Failure failure && failure.Cause is DisassociateInfo) + { + disassociateNotification = + new Disassociated(failure.Cause.AsInstanceOf()); + } + else + { + disassociateNotification = new Disassociated(DisassociateInfo.Unknown); + } + + lr.Listener.Notify(disassociateNotification); + lr.WrappedHandle.Disassociate(DisassociationReason(@event.Reason), _log); + break; } - lr.Listener.Notify(disassociateNotification); - lr.WrappedHandle.Disassociate(DisassociationReason(@event.Reason), _log); - }) - .With(iu => - iu.WrappedHandle.Disassociate(DisassociationReason(@event.Reason), _log))); + + case InboundUnassociated iu: + iu.WrappedHandle.Disassociate(DisassociationReason(@event.Reason), _log); + break; + } + }); /* * Set the initial ProtocolStateActor state to CLOSED if OUTBOUND * Set the initial ProtocolStateActor state to WAITHANDSHAKE if INBOUND * */ - _initialData.Match() - .With(d => - { + switch (_initialData) + { + case OutboundUnassociated d: // attempt to open underlying transport to the remote address // if using DotNetty, this is where the socket connection is opened. d.Transport.Associate(d.RemoteAddress).ContinueWith(result => new HandleMsg(result.Result), TaskContinuationOptions.ExecuteSynchronously).PipeTo(Self); StartWith(AssociationState.Closed, d); - }) - .With(d => - { + break; + case InboundUnassociated d: // inbound transport is opened already inside the ProtocolStateManager // therefore we just have to set ourselves as listener and wait for // incoming handshake attempts from the client. d.WrappedHandle.ReadHandlerSource.SetResult(new ActorHandleEventListener(Self)); StartWith(AssociationState.WaitHandshake, d); - }); + break; + } InitHandshakeTimer(); } @@ -1170,9 +1189,9 @@ private static string DisassociationReason(Reason reason) { switch (reason) { - case Normal n: + case Normal _: return "the ProtocolStateActor was stopped normally"; - case Shutdown s: + case Shutdown _: return "the ProtocolStateActor was shutdown"; case Failure f: return $"the ProtocolStateActor failed: {f.Cause}"; @@ -1189,10 +1208,18 @@ protected override void LogTermination(Reason reason) { if (reason is Failure failure) { - failure.Cause.Match() - .With(() => { }) //no logging - .With(() => { }) //no logging - .With(timeoutReason => _log.Error(timeoutReason.ErrorMessage)); + switch (failure.Cause) + { + case DisassociateInfo _: + //no logging + break; + case ForbiddenUidReason _: + //no logging + break; + case TimeoutReason timeoutReason: + _log.Error(timeoutReason.ErrorMessage); + break; + } } else base.LogTermination(reason); diff --git a/src/core/Akka.Remote/Transport/TransportAdapters.cs b/src/core/Akka.Remote/Transport/TransportAdapters.cs index 85232435a72..0690cf387c5 100644 --- a/src/core/Akka.Remote/Transport/TransportAdapters.cs +++ b/src/core/Akka.Remote/Transport/TransportAdapters.cs @@ -591,17 +591,17 @@ protected long NextId() /// TBD protected override void OnReceive(object message) { - PatternMatch.Match(message) - .With(listen => - { + switch (message) + { + case ListenUnderlying listen: LocalAddress = listen.ListenAddress; var capturedSelf = Self; listen.UpstreamListener.ContinueWith( listenerRegistered => capturedSelf.Tell(new ListenerRegistered(listenerRegistered.Result)), TaskContinuationOptions.ExecuteSynchronously); - }) - .With(listener => - { + break; + + case ListenerRegistered listener: AssociationListener = listener.Listener; foreach (var dEvent in DelayedEvents) { @@ -609,8 +609,12 @@ protected override void OnReceive(object message) } DelayedEvents = new Queue(); Context.Become(Ready); - }) - .Default(m => DelayedEvents.Enqueue(m)); + break; + + default: + DelayedEvents.Enqueue(message); + break; + } } /// diff --git a/src/core/Akka.Streams.Tests.TCK/ActorPublisherTest.cs b/src/core/Akka.Streams.Tests.TCK/ActorPublisherTest.cs index af3d0dc3f27..f78968a27f7 100644 --- a/src/core/Akka.Streams.Tests.TCK/ActorPublisherTest.cs +++ b/src/core/Akka.Streams.Tests.TCK/ActorPublisherTest.cs @@ -75,11 +75,15 @@ public TestPublisher(long allElements) } } - protected override bool Receive(object message) => - message.Match() - .With(_ => LoopDemand()) - .With(_ => - { + protected override bool Receive(object message) + { + switch (message) + { + case Request _: + LoopDemand(); + return true; + + case Produce _: if (TotalDemand > 0 && !IsCompleted && _current < _count) OnNext(_current++); else if (!IsCompleted && _current == _count) @@ -88,11 +92,13 @@ protected override bool Receive(object message) => { //no-op } - }).Default(_ => - { + return true; + + default: //no-op - }) - .WasHandled; + return true; + } + } private void LoopDemand() { diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphStageTimersSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphStageTimersSpec.cs index e60159799c1..65fa2725c1a 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphStageTimersSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphStageTimersSpec.cs @@ -245,18 +245,25 @@ public override void PreStart() private void OnTestEvent(object message) { - message.Match() - .With(() => ScheduleOnce(TestSingleTimerKey, Dilated(500))) - .With(() => ScheduleOnce(TestSingleTimerResubmitKey, Dilated(500))) - .With(() => - { + switch (message) + { + case TestSingleTimer _: + ScheduleOnce(TestSingleTimerKey, Dilated(500)); + break; + case TestSingleTimerResubmit _: + ScheduleOnce(TestSingleTimerResubmitKey, Dilated(500)); + break; + case TestCancelTimer _: ScheduleOnce(TestCancelTimerKey, Dilated(1)); // Likely in mailbox but we cannot guarantee CancelTimer(TestCancelTimerKey); _stage._probe.Tell(TestCancelTimerAck.Instance); ScheduleOnce(TestCancelTimerKey, Dilated(500)); - }) - .With(() => ScheduleRepeatedly(TestRepeatedTimerKey, Dilated(100))); + break; + case TestRepeatedTimer _: + ScheduleRepeatedly(TestRepeatedTimerKey, Dilated(100)); + break; + } } private TimeSpan Dilated(int milliseconds) diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/KeepGoingStageSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/KeepGoingStageSpec.cs index 87436c7ef1e..31133b884c0 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/KeepGoingStageSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/KeepGoingStageSpec.cs @@ -142,21 +142,27 @@ public override void PreStart() private void OnCommand(IPingCmd cmd) { - cmd.Match() - .With(r => _listener = r.Probe) - .With(() => _listener.Tell(Pong.Instance)) - .With(() => - { + switch (cmd) + { + case Register r: + _listener = r.Probe; + break; + + case Ping _: + _listener.Tell(Pong.Instance); + break; + + case CompleteStage _: CompleteStage(); _listener.Tell(EndOfEventHandler.Instance); - }) - .With(() => - { + break; + + case FailStage _: FailStage(new TestException("test")); _listener.Tell(EndOfEventHandler.Instance); - }) - .With(() => - { + break; + + case Throw _: try { throw new TestException("test"); @@ -165,7 +171,7 @@ private void OnCommand(IPingCmd cmd) { _listener.Tell(EndOfEventHandler.Instance); } - }); + } } } diff --git a/src/core/Akka.Streams/Implementation/FanIn.cs b/src/core/Akka.Streams/Implementation/FanIn.cs index b223f6d140e..4d69121d32c 100644 --- a/src/core/Akka.Streams/Implementation/FanIn.cs +++ b/src/core/Akka.Streams/Implementation/FanIn.cs @@ -90,36 +90,52 @@ protected InputBunch(int inputCount, int bufferSize, IPump pump) isReady: () => _markedPending > 0); // FIXME: Eliminate re-wraps - SubReceive = new SubReceive(msg => msg.Match() - .With(subscribe => _inputs[subscribe.Id].SubReceive.CurrentReceive(new Actors.OnSubscribe(subscribe.Subscription))) - .With(next => - { - var id = next.Id; - if (IsMarked(id) && !IsPending(id)) - _markedPending++; - Pending(id, on: true); - _receivedInput = true; - _inputs[id].SubReceive.CurrentReceive(new Actors.OnNext(next.Element)); - }) - .With(complete => + SubReceive = new SubReceive(msg => + { + switch (msg) { - var id = complete.Id; - if (!IsPending(id)) + case FanIn.OnSubscribe subscribe: + _inputs[subscribe.Id].SubReceive.CurrentReceive(new Actors.OnSubscribe(subscribe.Subscription)); + return true; + + case FanIn.OnNext next: { - if (IsMarked(id) && !IsDepleted(id)) - _markedDepleted++; - Depleted(id, on: true); - OnDepleted(id); + var id = next.Id; + if (IsMarked(id) && !IsPending(id)) + _markedPending++; + Pending(id, on: true); + _receivedInput = true; + _inputs[id].SubReceive.CurrentReceive(new Actors.OnNext(next.Element)); + return true; } - - RegisterCompleted(id); - _inputs[id].SubReceive.CurrentReceive(Actors.OnComplete.Instance); - - if (!_receivedInput && IsAllCompleted) - OnCompleteWhenNoInput(); - }) - .With(error => OnError(error.Id, error.Cause)) - .WasHandled); + + case FanIn.OnComplete complete: + { + var id = complete.Id; + if (!IsPending(id)) + { + if (IsMarked(id) && !IsDepleted(id)) + _markedDepleted++; + Depleted(id, on: true); + OnDepleted(id); + } + + RegisterCompleted(id); + _inputs[id].SubReceive.CurrentReceive(Actors.OnComplete.Instance); + + if (!_receivedInput && IsAllCompleted) + OnCompleteWhenNoInput(); + return true; + } + + case FanIn.OnError error: + OnError(error.Id, error.Cause); + return true; + + default: + return false; + } + }); } /// diff --git a/src/core/Akka.Streams/Implementation/FanOut.cs b/src/core/Akka.Streams/Implementation/FanOut.cs index 56c5a4d964b..bf2026b0d11 100644 --- a/src/core/Akka.Streams/Implementation/FanOut.cs +++ b/src/core/Akka.Streams/Implementation/FanOut.cs @@ -80,42 +80,54 @@ public OutputBunch(int outputCount, IActorRef impl, IPump pump) isReady: () => _markedPending > 0); // FIXME: Eliminate re-wraps - SubReceive = new SubReceive(message => message.Match() - .With>(exposed => - { - var publishers = exposed.Publishers.GetEnumerator(); - var outputs = _outputs.AsEnumerable().GetEnumerator(); - - while (publishers.MoveNext() && outputs.MoveNext()) - outputs.Current.SubReceive.CurrentReceive(new ExposedPublisher(publishers.Current)); - }) - .With(more => - { - if (more.Demand < 1) - // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError - Error(more.Id, ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveException); - else - { - if (_marked[more.Id] && !_pending[more.Id]) - _markedPending += 1; - _pending[more.Id] = true; - _outputs[more.Id].SubReceive.CurrentReceive(new RequestMore(null, more.Demand)); - } - }) - .With(cancel => + SubReceive = new SubReceive(message => + { + switch (message) { - if (_unmarkCancelled) - UnmarkOutput(cancel.Id); - - if (_marked[cancel.Id] && !_cancelled[cancel.Id]) - _markedCanceled += 1; - - _cancelled[cancel.Id] = true; - OnCancel(cancel.Id); - _outputs[cancel.Id].SubReceive.CurrentReceive(new Cancel(null)); - }) - .With(pending => _outputs[pending.Id].SubReceive.CurrentReceive(SubscribePending.Instance)) - .WasHandled); + case FanOut.ExposedPublishers exposed: + using (var publishers = exposed.Publishers.GetEnumerator()) + { + using (var outputs = _outputs.AsEnumerable().GetEnumerator()) + { + while (publishers.MoveNext() && outputs.MoveNext()) + outputs.Current?.SubReceive.CurrentReceive(new ExposedPublisher(publishers.Current)); + } + } + return true; + + case FanOut.SubstreamRequestMore more: + if (more.Demand < 1) + // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError + Error(more.Id, ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveException); + else + { + if (_marked[more.Id] && !_pending[more.Id]) + _markedPending += 1; + _pending[more.Id] = true; + _outputs[more.Id].SubReceive.CurrentReceive(new RequestMore(null, more.Demand)); + } + return true; + + case FanOut.SubstreamCancel cancel: + if (_unmarkCancelled) + UnmarkOutput(cancel.Id); + + if (_marked[cancel.Id] && !_cancelled[cancel.Id]) + _markedCanceled += 1; + + _cancelled[cancel.Id] = true; + OnCancel(cancel.Id); + _outputs[cancel.Id].SubReceive.CurrentReceive(new Cancel(null)); + return true; + + case FanOut.SubstreamSubscribePending pending: + _outputs[pending.Id].SubReceive.CurrentReceive(SubscribePending.Instance); + return true; + + default: + return false; + } + }); } /// diff --git a/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs b/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs index 582d6a3a53a..fa58a81a5cf 100644 --- a/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs +++ b/src/core/Akka.Streams/Implementation/IO/FilePublisher.cs @@ -127,11 +127,20 @@ protected override void PreStart() /// TBD /// TBD protected override bool Receive(object message) - => message.Match() - .With(() => ReadAndSignal(_maxBuffer)) - .With(() => ReadAndSignal(_maxBuffer)) - .With(() => Context.Stop(Self)) - .WasHandled; + { + switch (message) + { + case Request _: + case Continue _: + ReadAndSignal(_maxBuffer); + return true; + case Actors.Cancel _: + Context.Stop(Self); + return true; + default: + return false; + } + } private void ReadAndSignal(int maxReadAhead) { diff --git a/src/core/Akka.Streams/Implementation/IO/InputStreamPublisher.cs b/src/core/Akka.Streams/Implementation/IO/InputStreamPublisher.cs index 4d4dc21a84c..3f8231c1b2f 100644 --- a/src/core/Akka.Streams/Implementation/IO/InputStreamPublisher.cs +++ b/src/core/Akka.Streams/Implementation/IO/InputStreamPublisher.cs @@ -73,11 +73,20 @@ public InputStreamPublisher(Stream inputstream, TaskCompletionSource c /// TBD /// TBD protected override bool Receive(object message) - => message.Match() - .With(ReadAndSignal) - .With(ReadAndSignal) - .With(() => Context.Stop(Self)) - .WasHandled; + { + switch (message) + { + case Request _: + case Continue _: + ReadAndSignal(); + return true; + case Actors.Cancel _: + Context.Stop(Self); + return true; + default: + return false; + } + } /// /// TBD diff --git a/src/core/Akka.Tests.Performance/Actor/ActorMemoryFootprintSpec.cs b/src/core/Akka.Tests.Performance/Actor/ActorMemoryFootprintSpec.cs index e63a865e857..1b336de6dd5 100644 --- a/src/core/Akka.Tests.Performance/Actor/ActorMemoryFootprintSpec.cs +++ b/src/core/Akka.Tests.Performance/Actor/ActorMemoryFootprintSpec.cs @@ -22,11 +22,13 @@ internal class MemoryActorBasePatternMatchActor : ActorBase { protected override bool Receive(object message) { - return message.Match() - .With(s => { }) - .With(i => { }) - .With(b => { }) - .WasHandled; + return message switch + { + string _ => true, + int _ => true, + bool _ => true, + _ => false + }; } public static Props Props { get; } = Props.Create(() => new MemoryActorBasePatternMatchActor()); diff --git a/src/core/Akka.Tests.Performance/Actor/ActorThroughputSpec.cs b/src/core/Akka.Tests.Performance/Actor/ActorThroughputSpec.cs index afb1b4dd52f..fee5ac0c0ea 100644 --- a/src/core/Akka.Tests.Performance/Actor/ActorThroughputSpec.cs +++ b/src/core/Akka.Tests.Performance/Actor/ActorThroughputSpec.cs @@ -19,50 +19,6 @@ namespace Akka.Tests.Performance.Actor public class ActorThroughputSpec { #region Actor classes - internal class BenchmarkActorBasePatternMatchActor : ActorBase - { - private readonly Counter _counter; - private readonly long _maxExpectedMessages; - private long _currentMessages = 0; - private readonly ManualResetEventSlim _resetEvent; - - public BenchmarkActorBasePatternMatchActor(Counter counter, long maxExpectedMessages, ManualResetEventSlim resetEvent) - { - _counter = counter; - _maxExpectedMessages = maxExpectedMessages; - _resetEvent = resetEvent; - } - - protected override bool Receive(object message) - { - return message.Match() - .With(IncrementAndCheck) - .With(IncrementAndCheck) - .With(simpleDataMessage => - { - if (simpleDataMessage.Age > 20) - { - IncrementAndCheck(); - } - else - { - IncrementAndCheck(); - } - }) - .Default(o => IncrementAndCheck()) - .WasHandled; - } - - private void IncrementAndCheck() - { - _counter.Increment(); - if (++_currentMessages == _maxExpectedMessages) - _resetEvent.Set(); - } - - public static Props Props(Counter counter, long maxExpectedMessages, ManualResetEventSlim resetEvent) => Akka.Actor.Props.Create( - () => new BenchmarkActorBasePatternMatchActor(counter, maxExpectedMessages, resetEvent)); - } internal class BenchmarkUntypedActor : UntypedActor { @@ -201,7 +157,6 @@ public SimpleData(string name, int age) private Counter _mailboxThroughput; private readonly ManualResetEventSlim _resetEvent = new ManualResetEventSlim(false); - private IActorRef _actorBasePatternMatchActorRef; private IActorRef _untypedActorRef; private IActorRef _receiveActorRef; private IActorRef _minimalActorRef; @@ -215,30 +170,11 @@ public void Setup(BenchmarkContext context) _mailboxThroughput = context.GetCounter(MailboxCounterName); System = ActorSystem.Create($"{GetType().Name}{Counter.GetAndIncrement()}"); - _actorBasePatternMatchActorRef = System.ActorOf(BenchmarkActorBasePatternMatchActor.Props(_mailboxThroughput, MailboxMessageCount * 3, _resetEvent)); _untypedActorRef = System.ActorOf(BenchmarkUntypedActor.Props(_mailboxThroughput, MailboxMessageCount * 3, _resetEvent)); _receiveActorRef = System.ActorOf(BenchmarkReceiveActor.Props(_mailboxThroughput, MailboxMessageCount * 3, _resetEvent)); _minimalActorRef = new BenchmarkMinimalActorRef(_mailboxThroughput, MailboxMessageCount, _resetEvent); } - [PerfBenchmark( - Description = "Measures the throughput of an ActorBase + Pattern match class", - RunMode = RunMode.Iterations, NumberOfIterations = 13, TestMode = TestMode.Measurement, - RunTimeMilliseconds = 1000)] - [CounterMeasurement(MailboxCounterName)] - [GcMeasurement(GcMetric.TotalCollections, GcGeneration.AllGc)] - public void ActorBase_PatternMatch_Throughput(BenchmarkContext context) - { - for (var i = 0; i < MailboxMessageCount;) - { - _actorBasePatternMatchActorRef.Tell(dataExample); - _actorBasePatternMatchActorRef.Tell(intExample); - _actorBasePatternMatchActorRef.Tell(stringExample); - ++i; - } - _resetEvent.Wait(); //wait up to a second - } - [PerfBenchmark( Description = "Measures the throughput of an UntypedActor", RunMode = RunMode.Iterations, NumberOfIterations = 13, TestMode = TestMode.Measurement, diff --git a/src/core/Akka.Tests/Actor/PatternSpec.cs b/src/core/Akka.Tests/Actor/PatternSpec.cs deleted file mode 100644 index e594f1a4994..00000000000 --- a/src/core/Akka.Tests/Actor/PatternSpec.cs +++ /dev/null @@ -1,108 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2022 Lightbend Inc. -// Copyright (C) 2013-2022 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using System.Threading.Tasks; -using Akka.Actor; -using Akka.Event; -using Akka.TestKit; -using Akka.TestKit.Extensions; -using Akka.Tests.Util; -using Xunit; - -namespace Akka.Tests.Actor -{ - public class PatternSpec : AkkaSpec - { - [Fact] - public async Task GracefulStop_must_provide_Task_for_stopping_an_actor() - { - //arrange - var target = Sys.ActorOf(); - - //act - var result = await target.GracefulStop(TimeSpan.FromSeconds(5)) - .AwaitWithTimeout(TimeSpan.FromSeconds(6));; - - //assert - Assert.True(result); - - } - - [Fact] - public async Task GracefulStop_must_complete_Task_when_actor_already_terminated() - { - //arrange - var target = Sys.ActorOf(); - - //act - - - //assert - Assert.True(await target.GracefulStop(TimeSpan.FromSeconds(5))); - Assert.True(await target.GracefulStop(TimeSpan.FromSeconds(5))); - } - - [Fact] - public async Task GracefulStop_must_complete_Task_with_TaskCanceledException_when_actor_not_terminated_within_timeout() - { - //arrange - var target = Sys.ActorOf(); - var latch = new TestLatch(); - - //act - target.Tell((latch, TimeSpan.FromSeconds(2))); - - //assert - await Assert.ThrowsAsync(async () => - { - await target.GracefulStop(TimeSpan.FromMilliseconds(50)); - }); - latch.Open(); - - } - - [Fact] - public async Task GracefulStop_must_not_send_unnecessary_Deadletter_bug_2157() - { - //arrange - var target = Sys.ActorOf(); - Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter)); - - //act - var stopped = await target.GracefulStop(TimeSpan.FromSeconds(5)); - - //assert - Assert.True(stopped); - await ExpectNoMsgAsync(TimeSpan.Zero); - } - - #region Actors - - public sealed class Work - { - public Work(TimeSpan duration) - { - Duration = duration; - } - - public TimeSpan Duration { get; private set; } - } - - public class TargetActor : UntypedActor - { - protected override void OnReceive(object message) - { - PatternMatch.Match(message) - .With<(TestLatch, TimeSpan)>(t => t.Item1.Ready(t.Item2)); - } - } - - #endregion - } -} - diff --git a/src/core/Akka.Tests/Akka.Tests.csproj b/src/core/Akka.Tests/Akka.Tests.csproj index 80a9b5c354d..42075bf9b01 100644 --- a/src/core/Akka.Tests/Akka.Tests.csproj +++ b/src/core/Akka.Tests/Akka.Tests.csproj @@ -6,6 +6,7 @@ Akka.Tests $(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion) $(NetTestVersion);$(NetCoreTestVersion) + 8.0 diff --git a/src/core/Akka.Tests/Routing/ListenerSpec.cs b/src/core/Akka.Tests/Routing/ListenerSpec.cs index 1ed6553d8a6..4d158044f41 100644 --- a/src/core/Akka.Tests/Routing/ListenerSpec.cs +++ b/src/core/Akka.Tests/Routing/ListenerSpec.cs @@ -63,13 +63,16 @@ public BroadcastActor() protected override void OnReceive(object message) { - PatternMatch.Match(message) - .With(l => Listeners.ListenerReceive(l)) - .With(s => - { + switch (message) + { + case ListenerMessage l: + Listeners.ListenerReceive(l); + break; + case string s: if (s.Equals("foo")) Listeners.Gossip("bar"); - }); + break; + } } public ListenerSupport Listeners { get; private set; } @@ -91,20 +94,19 @@ public ListenerActor(TestLatch fooLatch, TestLatch barLatch, AtomicCounter barCo protected override void OnReceive(object message) { - PatternMatch.Match(message) - .With(str => + if (message is string str) + { + if (str.Equals("bar")) + { + _barCount.GetAndIncrement(); + _barLatch.CountDown(); + } + + if (str.Equals("foo")) { - if (str.Equals("bar")) - { - _barCount.GetAndIncrement(); - _barLatch.CountDown(); - } - - if (str.Equals("foo")) - { - _fooLatch.CountDown(); - } - }); + _fooLatch.CountDown(); + } + } } public ListenerSupport Listeners { get; private set; } diff --git a/src/core/Akka.Tests/Routing/SmallestMailboxSpec.cs b/src/core/Akka.Tests/Routing/SmallestMailboxSpec.cs index e3ec20903ce..b6828120df4 100644 --- a/src/core/Akka.Tests/Routing/SmallestMailboxSpec.cs +++ b/src/core/Akka.Tests/Routing/SmallestMailboxSpec.cs @@ -28,22 +28,23 @@ public SmallestMailboxActor(ConcurrentDictionary usedActors) protected override void OnReceive(object message) { - message.Match() - .With<(TestLatch, TestLatch)>(t => - { - TestLatch busy = t.Item1, receivedLatch = t.Item2; + switch (message) + { + case (TestLatch busy, TestLatch receivedLatch) _: usedActors.TryAdd(0, Self.Path.ToString()); Self.Tell("another in busy mailbox"); receivedLatch.CountDown(); busy.Ready(TestLatch.DefaultTimeout); - }) - .With<(int, TestLatch)>(t => - { - var msg = t.Item1; var receivedLatch = t.Item2; + break; + + case (int msg, TestLatch receivedLatch) _: usedActors.TryAdd(msg, Self.Path.ToString()); receivedLatch.CountDown(); - }) - .With(t => { }); + break; + + case string _: + break; + } } } diff --git a/src/core/Akka.Tests/Util/PatternSpec.cs b/src/core/Akka.Tests/Util/PatternSpec.cs deleted file mode 100644 index bb7de563fd5..00000000000 --- a/src/core/Akka.Tests/Util/PatternSpec.cs +++ /dev/null @@ -1,25 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2022 Lightbend Inc. -// Copyright (C) 2013-2022 .NET Foundation -// -//----------------------------------------------------------------------- - -using Xunit; - -namespace Akka.Tests.Util -{ - - public class PatternSpec - { - [Fact] - public void PatternMatch_should_not_throw_NullReferenceException() - { - object nullObj = null; - nullObj.Match() - .With(str => { }) - .Default(m => {}); - } - } -} - diff --git a/src/core/Akka/Actor/GracefulStopSupport.cs b/src/core/Akka/Actor/GracefulStopSupport.cs index 1526e53aa73..d1f92e5c21e 100644 --- a/src/core/Akka/Actor/GracefulStopSupport.cs +++ b/src/core/Akka/Actor/GracefulStopSupport.cs @@ -67,27 +67,22 @@ public static Task GracefulStop(this IActorRef target, TimeSpan timeout, o { if (t.Status == TaskStatus.RanToCompletion) { - var returnResult = false; - PatternMatch.Match(t.Result) - .With(terminated => - { - returnResult = (terminated.ActorRef.Path.Equals(target.Path)); - }) - .Default(m => - { + switch (t.Result) + { + case Terminated terminated: + return terminated.ActorRef.Path.Equals(target.Path); + default: internalTarget.SendSystemMessage(new Unwatch(internalTarget, promiseRef)); - returnResult = false; - }); - return returnResult; - } - else - { - internalTarget.SendSystemMessage(new Unwatch(internalTarget, promiseRef)); - if (t.Status == TaskStatus.Canceled) - throw new TaskCanceledException(); - else - throw t.Exception; + return false; + + } } + + internalTarget.SendSystemMessage(new Unwatch(internalTarget, promiseRef)); + if (t.Status == TaskStatus.Canceled) + throw new TaskCanceledException(); + + throw t.Exception; }, TaskContinuationOptions.ExecuteSynchronously); } } diff --git a/src/core/Akka/Event/TraceLogger.cs b/src/core/Akka/Event/TraceLogger.cs index aa49d9f763e..da3baeefc6c 100644 --- a/src/core/Akka/Event/TraceLogger.cs +++ b/src/core/Akka/Event/TraceLogger.cs @@ -37,17 +37,28 @@ public class TraceLogger : UntypedActor /// TBD protected override void OnReceive(object message) { - message.Match() - .With(m => Sender.Tell(new LoggerInitialized())) - .With(m => Trace.TraceError(m.ToString())) - .With(m => Trace.TraceWarning(m.ToString())) - .With(m => Trace.TraceWarning(string.Format("Deadletter - unable to send message {0} from {1} to {2}", m.Message, m.Sender, m.Sender), typeof(DeadLetter).ToString())) - .With(m => Trace.TraceWarning("Unhandled message!")) - .Default(m => - { - if (m != null) - Trace.TraceInformation(m.ToString()); - }); + switch (message) + { + case InitializeLogger _: + Sender.Tell(new LoggerInitialized()); + break; + case Error m: + Trace.TraceError(m.ToString()); + break; + case Warning m: + Trace.TraceWarning(m.ToString()); + break; + case DeadLetter m: + Trace.TraceWarning($"Deadletter - unable to send message {m.Message} from {m.Sender} to {m.Sender}", typeof(DeadLetter)); + break; + case UnhandledMessage _: + Trace.TraceWarning("Unhandled message!"); + break; + default: + if (message != null) + Trace.TraceInformation(message.ToString()); + break; + } } } } diff --git a/src/core/Akka/PatternMatch.cs b/src/core/Akka/PatternMatch.cs deleted file mode 100644 index 3a985853c97..00000000000 --- a/src/core/Akka/PatternMatch.cs +++ /dev/null @@ -1,238 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2022 Lightbend Inc. -// Copyright (C) 2013-2022 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; - -namespace Akka -{ - /// - /// Class PatternMatch. - /// - [Obsolete("Use instead the pattern matching feature introduced in C# 7.0")] - public static class PatternMatch - { - - /// - /// Matches the specified target. - /// - /// The target. - /// Case. - public static Case Match(this object target) - { - return new Case(target); - } - - /// - /// Matches the specified target and return a result of target processing. - /// - /// The target. - /// Case. - public static Case Match(this object target) - { - return new Case(target); - } - } - - /// - /// Interface IMatchResult - /// - public interface IMatchResult - { - /// - /// Gets a value indicating whether [was handled]. - /// - /// true if [was handled]; otherwise, false. - bool WasHandled { get; } - } - - /// - /// Class Case. - /// - public class Case : IMatchResult - { - /// - /// The _message - /// - private readonly object _message; - /// - /// The _handled - /// - private bool _handled; - /// - /// Gets a value indicating whether [was handled]. - /// - /// true if [was handled]; otherwise, false. - public bool WasHandled { get { return _handled; } } - - /// - /// Initializes a new instance of the class. - /// - /// The message. - public Case(object message) - { - _message = message; - } - - /// - /// Withes the specified action. - /// - /// The type of the t message. - /// The action. - /// Case. - public Case With(Action action) - { - if (!_handled && _message is TMessage) - { - action(); - _handled = true; - } - - return this; - } - - /// - /// Withes the specified action. - /// - /// The type of the t message. - /// The action. - /// Case. - public Case With(Action action) - { - if (!_handled && _message is TMessage) - { - action((TMessage) _message); - _handled = true; - } - - return this; - } - - /// - /// Defaults the specified action. - /// - /// The action. - /// IMatchResult. - public IMatchResult Default(Action action) - { - if (!_handled) - { - action(_message); - _handled = true; - } - return AlwaysHandled.Instance; - } - - /// - /// Class AlwaysHandled. - /// - private class AlwaysHandled : IMatchResult - { - /// - /// The instance - /// - public static readonly AlwaysHandled Instance = new AlwaysHandled(); - /// - /// Prevents a default instance of the class from being created. - /// - private AlwaysHandled() { } - /// - /// Gets a value indicating whether [was handled]. - /// - /// true if [was handled]; otherwise, false. - public bool WasHandled { get { return true; } } - } - } - - /// - /// Class Case with returning result. - /// - /// The type of return value - public class Case : IMatchResult - { - /// - /// The _message - /// - private readonly object _message; - /// - /// The _handled - /// - private bool _handled; - - /// - /// The final result of execution - /// - private T _result; - - /// - /// Gets a value indicating whether [was handled]. - /// - /// true if [was handled]; otherwise, false. - public bool WasHandled { get { return _handled; } } - - /// - /// Initializes a new instance of the class. - /// - /// The message. - public Case(object message) - { - _message = message; - } - - /// - /// Withes the specified action. - /// - /// The type of the t message. - /// The function. - /// Case. - public Case With(Func function) - { - if (!_handled && _message is TMessage) - { - _result = function(); - _handled = true; - } - - return this; - } - - /// - /// Withes the specified action. - /// - /// The type of the t message. - /// The action. - /// Case. - public Case With(Func function) - { - if (!_handled && _message is TMessage) - { - _result = function((TMessage)_message); - _handled = true; - } - - return this; - } - - /// - /// Defaults the specified action. - /// - /// The default function. - /// The result of the matching - public T ResultOrDefault(Func function) - { - if (!_handled) - { - _result = function(_message); - _handled = true; - } - - return _result; - } - } - - -} -