From 15dfbae4f384835d087c3f502518a12a3ddad79a Mon Sep 17 00:00:00 2001 From: Sean Killeen Date: Thu, 14 Oct 2021 00:30:14 -0400 Subject: [PATCH 1/2] fixes --- docs/articles/actors/untyped-actor-api.md | 13 ++++++++++ docs/articles/clustering/cluster-client.md | 3 +++ docs/articles/concepts/addressing.md | 5 ++++ docs/articles/concepts/supervision.md | 2 ++ docs/articles/hocon/index.md | 3 +++ docs/articles/networking/serialization.md | 4 +++ docs/articles/persistence/event-adapters.md | 3 +++ docs/articles/persistence/event-sourcing.md | 4 +++ docs/articles/persistence/persistent-fsm.md | 6 +++++ docs/articles/persistence/persistent-views.md | 2 ++ docs/articles/remoting/deployment.md | 3 +++ .../streams/buffersandworkingwithrate.md | 26 +++++++++++++++++++ docs/articles/streams/error-handling.md | 2 ++ docs/articles/streams/quickstart.md | 11 ++++++++ .../streams/workingwithstreamingio.md | 1 + docs/articles/utilities/event-bus.md | 1 + docs/articles/utilities/logging.md | 4 +++ docs/articles/utilities/scheduler.md | 1 + docs/articles/utilities/serilog.md | 4 +++ docs/community/building-akka-net.md | 4 +++ docs/community/documentation-guidelines.md | 2 ++ 21 files changed, 104 insertions(+) diff --git a/docs/articles/actors/untyped-actor-api.md b/docs/articles/actors/untyped-actor-api.md index 00778426c66..655db1ae05c 100644 --- a/docs/articles/actors/untyped-actor-api.md +++ b/docs/articles/actors/untyped-actor-api.md @@ -16,6 +16,7 @@ The Actor Model provides a higher level of abstraction for writing concurrent an Actors in C# are implemented by extending the `UntypedActor` class and and implementing the `OnReceive` method. This method takes the message as a parameter. Here is an example: + ```csharp public class MyActor : UntypedActor { @@ -39,12 +40,14 @@ public class MyActor : UntypedActor ### Props `Props` is a configuration class to specify options for the creation of actors, think of it as an immutable and thus freely shareable recipe for creating an actor including associated deployment information (e.g. which dispatcher to use, see more below). Here are some examples of how to create a `Props` instance + ```csharp Props props1 = Props.Create(typeof(MyActor)); Props props2 = Props.Create(() => new MyActorWithArgs("arg")); Props props3 = Props.Create(); Props props4 = Props.Create(typeof(MyActorWithArgs), "arg"); ``` + The second variant shows how to pass constructor arguments to the `Actor` being created, but it should only be used outside of actors as explained below. #### Recommended Practices @@ -78,6 +81,7 @@ system.ActorOf(DemoActor.Props(42), "demo"); ``` Another good practice is to declare what messages an `Actor` can receive in the companion object of the `Actor`, which makes easier to know what it can receive: + ```csharp public class DemoMessagesActor : UntypedActor { @@ -192,6 +196,7 @@ protected override void PostStop() { } ``` + The implementations shown above are the defaults provided by the `UntypedActor` class. ### Actor Lifecycle @@ -282,6 +287,7 @@ Context.ActorSelection("/user/serviceA/actor"); // will look up sibling beneath same supervisor Context.ActorSelection("../joe"); ``` + > [!NOTE] > It is always preferable to communicate with other Actors using their `IActorRef` instead of relying upon `ActorSelection`. Exceptions are: sending messages using the At-Least-Once Delivery facility, initiating first contact with a remote system. In all other cases `ActorRefs` can be provided during Actor creation or initialization, passing them from parent to child or introducing Actors by sending their `ActorRefs` to other Actors within messages. @@ -296,6 +302,7 @@ Context.ActorSelection("/user/serviceB/worker*"); // will look up all siblings beneath same supervisor Context.ActorSelection("../*"); ``` + Messages can be sent via the `ActorSelection` and the path of the `ActorSelection`is looked up when delivering each message. If the selection does not match any actors the message will be dropped. To acquire an `IActorRef` for an `ActorSelection` you need to send a message to the selection and use the `Sender` reference of the reply from the actor. There is a built-in `Identify` message that all Actors will understand and automatically reply to with a `ActorIdentity` message containing the `IActorRef`. This message is handled specially by the actors which are traversed in the sense that if a concrete name lookup fails (i.e. a non-wildcard path element does not correspond to a live actor) then a negative result is generated. Please note that this does not mean that delivery of that reply is guaranteed, it still is a normal message. @@ -350,6 +357,7 @@ This is the preferred way of sending messages. No blocking waiting for a message // don’t forget to think about who is the sender (2nd argument) target.Tell(message, Self); ``` + The sender reference is passed along with the message and available within the receiving actor via its `Sender` property while processing this message. Inside of an actor it is usually `Self` who shall be the sender, but there can be cases where replies shall be routed to some other actor—e.g. the parent—in which the second argument to `Tell` would be a different one. Outside of an actor and if no reply is needed the second argument can be `null`; if a reply is needed outside of an actor you can use the ask-pattern described next. ### Ask: Send-And-Receive-Future @@ -400,6 +408,7 @@ target.Forward(result, Context); When an actor receives a message it is passed into the `OnReceive` method, this is an abstract method on the `UntypedActor` base class that needs to be defined. Here is an example: + ```csharp public class MyActor : UntypedActor { @@ -677,6 +686,7 @@ static void Main(string[] args) The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox. Here is an example of the `IWithUnboundedStash` interface in action: + ```csharp public class ActorWithProtocol : UntypedActor, IWithUnboundedStash { @@ -775,6 +785,7 @@ protected override void PreRestart(Exception reason, object message) PostStop(); } ``` + Please note, that the child actors are *still restarted*, but no new `IActorRef` is created. One can recursively apply the same principles for the children, ensuring that their `PreStart()` method is called only at the creation of their refs. For more information see [What Restarting Means](xref:supervision#what-restarting-means). @@ -782,6 +793,7 @@ For more information see [What Restarting Means](xref:supervision#what-restartin #### Initialization via message passing There are cases when it is impossible to pass all the information needed for actor initialization in the constructor, for example in the presence of circular dependencies. In this case the actor should listen for an initialization message, and use `Become()` or a finite state-machine state transition to encode the initialized and uninitialized states of the actor. + ```csharp public class Service : UntypedActor { @@ -805,6 +817,7 @@ public class Service : UntypedActor } } ``` + If the actor may receive messages before it has been initialized, a useful tool can be the `Stash` to save messages until the initialization finishes, and replaying them after the actor became initialized. > [!WARNING] diff --git a/docs/articles/clustering/cluster-client.md b/docs/articles/clustering/cluster-client.md index df9e2c262e3..8a9f91d9598 100644 --- a/docs/articles/clustering/cluster-client.md +++ b/docs/articles/clustering/cluster-client.md @@ -14,7 +14,9 @@ Also, note it's necessary to change akka.actor.provider from `Akka.Actor.LocalAc ``` akka.actor.provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster" ``` + or this shorthand notation + ``` akka.actor.provider = cluster ``` @@ -79,6 +81,7 @@ RunOn(() => c.Tell(new Client.ClusterClient.SendToAll("/user/serviceB", "hi")); }, client); ``` + The `initialContacts` parameter is a `IEnumerable`, which can be created like this: ```csharp diff --git a/docs/articles/concepts/addressing.md b/docs/articles/concepts/addressing.md index 85e6bbedcc8..7f90f5757bb 100644 --- a/docs/articles/concepts/addressing.md +++ b/docs/articles/concepts/addressing.md @@ -45,6 +45,7 @@ Each actor path has an address component, describing the protocol and location b "akka://my-sys/user/service-a/worker1" // purely local "akka.tcp://my-sys@host.example.com:5678/user/service-b" // remote ```` + Here, `akka.tcp` is the default remote transport; other transports are pluggable. A remote host using UDP would be accessible by using akka.udp. The interpretation of the host and port part (i.e.``serv.example.com:5678`` in the example) depends on the transport mechanism used, but it must abide by the URI structural rules. ### Logical Actor Paths @@ -75,10 +76,13 @@ In addition to ActorSystem.actorSelection there is also `ActorContext.ActorSelec ````csharp Context.ActorSelection("../brother").Tell(msg); ```` + Absolute paths may of course also be looked up on context in the usual way, i.e. + ````csharp Context.ActorSelection("/user/serviceA").Tell(msg); ```` + will work as expected. ### Querying the Logical Actor Hierarchy @@ -87,6 +91,7 @@ Since the actor system forms a file-system like hierarchy, matching on paths is ```csharp Context.ActorSelection("../*").Tell(msg); ``` + will send msg to all siblings including the current actor. ## Summary: `ActorOf` vs. `ActorSelection` diff --git a/docs/articles/concepts/supervision.md b/docs/articles/concepts/supervision.md index 198ccc84e20..a9ca0a69213 100644 --- a/docs/articles/concepts/supervision.md +++ b/docs/articles/concepts/supervision.md @@ -78,6 +78,7 @@ Provided as a built-in pattern the `Akka.Pattern.BackoffSupervisor` implements t This pattern is useful when the started actor fails because some external resource is not available, and we need to give it some time to start-up again. One of the prime examples when this is useful is when a `UntypedPersistentActor` fails (by stopping) with a persistence failure - which indicates that the database may be down or overloaded, in such situations it makes most sense to give it a little bit of time to recover before the persistent actor is started. The following C# snippet shows how to create a backoff supervisor which will start the given echo actor after it has stopped because of a failure, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds: + ```csharp var childProps = Props.Create(); @@ -97,6 +98,7 @@ Using a `randomFactor` to add a little bit of additional variance to the backoff The `Akka.Pattern.BackoffSupervisor` actor can also be configured to restart the actor after a delay when the actor crashes and the supervision strategy decides that it should restart. The following C# snippet shows how to create a backoff supervisor which will start the given echo actor after it has crashed because of some exception, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds: + ```csharp var childProps = Props.Create(); diff --git a/docs/articles/hocon/index.md b/docs/articles/hocon/index.md index 793721c7c82..b8375ec8d8a 100644 --- a/docs/articles/hocon/index.md +++ b/docs/articles/hocon/index.md @@ -313,6 +313,7 @@ For example, the the ambient environment might have this definition ... ``` set Path=A;B;C ``` + ... we just don't know. But if the HOCON needs "PATH", then the start script must take a precautionary approach and enforce the necessary case as follows ... ``` @@ -331,7 +332,9 @@ The idea of self-referential substitution is to allow a new value for a field to path : "a:b:c" path : ${path}":d" ``` + is equal to: + ``` path : "a:b:c:d" ``` diff --git a/docs/articles/networking/serialization.md b/docs/articles/networking/serialization.md index 0790c837152..8ab583bd781 100644 --- a/docs/articles/networking/serialization.md +++ b/docs/articles/networking/serialization.md @@ -121,6 +121,7 @@ akka { } } ``` + If you want to verify that your `Props` are serializable, you can enable the following config option: ```hocon @@ -343,6 +344,7 @@ from being deserialized: Be warned that these class can be used as a man in the middle attack vector, but if you need to serialize one of these class, you can turn off this feature using this inside your HOCON settings: + ``` akka.actor.serialization-settings.hyperion.disallow-unsafe-type = false ``` @@ -367,6 +369,7 @@ There are two ways to set this up, one through the HOCON configuration file, and ### HOCON HOCON example: + ``` akka.actor.serialization-settings.hyperion.cross-platform-package-name-overrides = { netfx = [ @@ -401,6 +404,7 @@ The way it works that when the serializer detects that the type name contains th property into the string declared in the `rename-to`. In code, we can write this behavior as: + ```csharp if(packageName.Contains(fingerprint)) packageName = packageName.Replace(rename-from, rename-to); ``` diff --git a/docs/articles/persistence/event-adapters.md b/docs/articles/persistence/event-adapters.md index 63597fa106b..ddd767e1879 100644 --- a/docs/articles/persistence/event-adapters.md +++ b/docs/articles/persistence/event-adapters.md @@ -30,7 +30,9 @@ public class MyEventAdapter : IEventAdapter } } ``` + Then in order for it to be used on events coming to and from the journal you must bind it using the below configuration syntax: + ```hocon akka.persistence.journal { { @@ -47,5 +49,6 @@ akka.persistence.journal { } } ``` + It is possible to bind multiple adapters to one class for recovery, in which case the `FromJournal` methods of all bound adapters will be applied to a given matching event (in order of definition in the configuration). Since each adapter may return from 0 to n adapted events (called as `EventSequence`), each adapter can investigate the event and if it should indeed adapt it return the adapted event(s) for it. Other adapters which do not have anything to contribute during this adaptation simply return `EventSequence.Empty`. The adapted events are then delivered in-order to the `PersistentActor` during replay. diff --git a/docs/articles/persistence/event-sourcing.md b/docs/articles/persistence/event-sourcing.md index 973f9ec185e..6b56d0244bb 100644 --- a/docs/articles/persistence/event-sourcing.md +++ b/docs/articles/persistence/event-sourcing.md @@ -92,6 +92,7 @@ protected override void OnCommand(object message) // .. } ``` + The actor will always receive a `RecoveryCompleted` message, even if there are no events in the journal and the snapshot store is empty, or if it's a new persistent actor with a previously unused `PersistenceId`. If there is a problem with recovering the state of the actor from the journal, `OnRecoveryFailure` is called (logging the error by default) and the actor will be stopped. @@ -114,6 +115,7 @@ akka.persistence.internal-stash-overflow-strategy = "akka.persistence.ThrowExcep The `DiscardToDeadLetterStrategy` strategy also has a pre-packaged companion configurator `DiscardConfigurator`. You can also query the default strategy via the Akka persistence extension singleton: + ```csharp Context.System.DefaultInternalStashOverflowStrategy ``` @@ -200,6 +202,7 @@ While it is possible to nest mixed `Persist` and `PersistAsync` with keeping the If persistence of an event fails, `OnPersistFailure` will be invoked (logging the error by default), and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails is that it is unknown if the event was actually persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures will most likely fail anyway since the journal is probably unavailable. It is better to stop the actor and after a back-off timeout start it again. The `BackoffSupervisor` actor is provided to support such restarts. + ```csharp protected override void PreStart() { @@ -285,6 +288,7 @@ In your configuration, under the `akka.persistence.journal.xxx.replay-filter` se - off For example, if you configure the replay filter for `sqlite` plugin, it looks like this: + ```hocon # The replay filter can detect a corrupt event stream by inspecting # sequence numbers and writerUuid when replaying events. diff --git a/docs/articles/persistence/persistent-fsm.md b/docs/articles/persistence/persistent-fsm.md index 13e3cb6be76..94341f58835 100644 --- a/docs/articles/persistence/persistent-fsm.md +++ b/docs/articles/persistence/persistent-fsm.md @@ -42,6 +42,7 @@ Here is how everything is wired together: [!code-csharp[WebStoreCustomerFSMActor.cs](../../../src/core/Akka.Docs.Tests/Persistence/WebStoreCustomerFSMActor.cs?name=persistent-fsm-apply-event)] `AndThen` can be used to define actions which will be executed following event’s persistence - convenient for "side effects" like sending a message or logging. Notice that actions defined in andThen block are not executed on recovery: + ```cs GoTo(Paid.Instance).Applying(OrderExecuted.Instance).AndThen(cart => { @@ -51,7 +52,9 @@ GoTo(Paid.Instance).Applying(OrderExecuted.Instance).AndThen(cart => } }); ``` + A snapshot of state data can be persisted by calling the `SaveStateSnapshot()` method: + ```cs Stop().Applying(OrderDiscarded.Instance).AndThen(cart => { @@ -59,14 +62,17 @@ Stop().Applying(OrderDiscarded.Instance).AndThen(cart => SaveStateSnapshot(); }); ``` + On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the `ApplyEvent` method. ## Periodical snapshot by snapshot-after You can enable periodical `SaveStateSnapshot()` calls in `PersistentFSM` if you turn the following flag on in `reference.conf` + ``` akka.persistence.fsm.snapshot-after = 1000 ``` + this means `SaveStateSnapshot()` is called after the sequence number reaches multiple of 1000. > [!NOTE] diff --git a/docs/articles/persistence/persistent-views.md b/docs/articles/persistence/persistent-views.md index a81e8ee61fd..6a95c3b4234 100644 --- a/docs/articles/persistence/persistent-views.md +++ b/docs/articles/persistence/persistent-views.md @@ -45,9 +45,11 @@ view.Tell(new Update(true)); If the await parameter is set to true, messages that follow the `Update` request are processed when the incremental message replay, triggered by that update request, completed. If set to false (default), messages following the update request may interleave with the replayed message stream. Automated updates of all persistent views of an actor system can be turned off by configuration: + ```hocon akka.persistence.view.auto-update = off ``` + Implementation classes may override the configured default value by overriding the autoUpdate method. To limit the number of replayed messages per update request, applications can configure a custom *akka.persistence.view.auto-update-replay-max* value or override the `AutoUpdateReplayMax` property. The number of replayed messages for manual updates can be limited with the replayMax parameter of the Update message. ## Recovery diff --git a/docs/articles/remoting/deployment.md b/docs/articles/remoting/deployment.md index 3d21fdecd77..b4f84b34bc7 100644 --- a/docs/articles/remoting/deployment.md +++ b/docs/articles/remoting/deployment.md @@ -16,6 +16,7 @@ That's right - we can *deploy code over the network* with Akka.Remote. Here's what that concept looks like expressed as Akka.NET code: **Shared Actor / Message Code** + ```csharp /* * Create an actor and a message type that gets shared between Deployer and DeployTarget @@ -49,6 +50,7 @@ public class Hello ``` **DeployTarget (process that gets deployed onto)** + ```csharp class Program { @@ -73,6 +75,7 @@ class Program **Deployer (process that does deploying)** + ```csharp class Program { diff --git a/docs/articles/streams/buffersandworkingwithrate.md b/docs/articles/streams/buffersandworkingwithrate.md index 4c495f11752..f89220286dc 100644 --- a/docs/articles/streams/buffersandworkingwithrate.md +++ b/docs/articles/streams/buffersandworkingwithrate.md @@ -9,6 +9,7 @@ When upstream and downstream rates differ, especially when the throughput has sp ## Buffers for asynchronous stages In this section we will discuss internal buffers that are introduced as an optimization when using asynchronous stages. To run a stage asynchronously it has to be marked explicitly as such using the .async method. Being run asynchronously means that a stage, after handing out an element to its downstream consumer is able to immediately process the next message. To demonstrate what we mean by this, let's take a look at the following example: + ```csharp var source = Source.From(Enumerable.Range(1, 100)) .Select(i => { Console.WriteLine($"A: {i}"); return i; }).Async() @@ -16,7 +17,9 @@ To run a stage asynchronously it has to be marked explicitly as such using the . .Select(i => { Console.WriteLine($"C: {i}"); return i; }).Async() .RunWith(Sink.Ignore(), materializer); ``` + Running the above example, one of the possible outputs looks like this: + ``` A: 2 B: 1 @@ -27,6 +30,7 @@ B: 3 C: 2 C: 3 ``` + Note that the order is *not* `A:1, B:1, C:1, A:2, B:2, C:2,` which would correspond to the normal fused synchronous execution model of flows where an element completely passes through the processing pipeline before the next element enters the flow. The next element is processed by an asynchronous stage as soon as it is emitted the previous one. While pipelining in general increases throughput, in practice there is a cost of passing an element through the asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses a windowed, batching backpressure strategy internally. It is windowed because as opposed to a Stop-And-Wait protocol multiple elements might be "in-flight" concurrently with requests for elements. It is also batching because a new element is not immediately requested once an element has been drained from the window-buffer but multiple elements are requested after multiple elements have been drained. This batching strategy reduces the communication cost of propagating the backpressure signal through the asynchronous boundary. @@ -35,14 +39,18 @@ While this internal protocol is mostly invisible to the user (apart form its thr ## Internal buffers and their effect As we have explained, for performance reasons Akka Streams introduces a buffer for every asynchronous processing stage. The purpose of these buffers is solely optimization, in fact the size of 1 would be the most natural choice if there would be no need for throughput improvements. Therefore it is recommended to keep these buffer sizes small, and increase them only to a level suitable for the throughput requirements of the application. Default buffer sizes can be set through configuration: + ```hocon akka.stream.materializer.max-input-buffer-size = 16 ``` + Alternatively they can be set by passing a `ActorMaterializerSettings` to the materializer: + ```csharp var materializer = ActorMaterializer.Create(system, ActorMaterializerSettings.Create(system).WithInputBuffer(64, 64)); ``` + If the buffer size needs to be set for segments of a Flow only, it is possible by defining a separate Flow with these attributes: ```csharp @@ -55,7 +63,9 @@ var flow = Flow.FromGraph(section) .Via(Flow.Create().Select(_ => _/2)) .Async(); // the buffer size of this map is the default ``` + Here is an example of a code that demonstrate some of the issues caused by internal buffers: + ```csharp RunnableGraph.FromGraph(GraphDsl.Create(b => { // this is the asynchronous stage in this graph @@ -75,6 +85,7 @@ RunnableGraph.FromGraph(GraphDsl.Create(b => { return ClosedShape.Instance; })); ``` + Running the above example one would expect the number 3 to be printed in every 3 seconds (the `conflateWithSeed` step here is configured so that it counts the number of elements received before the downstream `ZipWith` consumes them). What is being printed is different though, we will see the number 1. The reason for this is the internal buffer which is by default 16 elements large, and pre-fetches elements before the `ZipWith` starts consuming them. It is possible to fix this issue by changing the buffer size of `ZipWith` (or the whole graph) to 1. We will still see a leading 1 though which is caused by an initial pre-fetch of the `ZipWith` element. > [!NOTE] @@ -90,31 +101,43 @@ The example below will ensure that 1000 jobs (but not more) are dequeued from an Source jobs = inboundJobsConnector() jobs.buffer(1000, OverflowStrategy.backpressure); ``` + The next example will also queue up 1000 jobs locally, but if there are more jobs waiting in the imaginary external systems, it makes space for the new element by dropping one element from the tail of the buffer. Dropping from the tail is a very common strategy but it must be noted that this will drop the youngest waiting job. If some "fairness" is desired in the sense that we want to be nice to jobs that has been waiting for long, then this option can be useful. + ```csharp jobs.buffer(1000, OverflowStrategy.dropTail) ``` + Instead of dropping the youngest element from the tail of the buffer a new element can be dropped without en-queueing it to the buffer at all. + ```csharp jobs.buffer(1000, OverflowStrategy.dropNew) ``` + Here is another example with a queue of 1000 jobs, but it makes space for the new element by dropping one element from the head of the buffer. This is the oldest waiting job. This is the preferred strategy if jobs are expected to be resent if not processed in a certain period. The oldest element will be retransmitted soon, (in fact a retransmitted duplicate might be already in the queue!) so it makes sense to drop it first. + ```csharp jobs.buffer(1000, OverflowStrategy.dropHead) ``` + Compared to the dropping strategies above, dropBuffer drops all the 1000 jobs it has enqueued once the buffer gets full. This aggressive strategy is useful when dropping jobs is preferred to delaying jobs. + ```csharp jobs.buffer(1000, OverflowStrategy.dropBuffer) ``` + If our imaginary external job provider is a client using our API, we might want to enforce that the client cannot have more than 1000 queued jobs otherwise we consider it flooding and terminate the connection. This is easily achievable by the error strategy which simply fails the stream once the buffer gets full. + ```csharp jobs.buffer(1000, OverflowStrategy.fail) ``` + ## Rate transformation ### Understanding conflate When a fast producer can not be informed to slow down by backpressure or some other signal, conflate might be useful to combine elements from a producer until a demand signal comes from a consumer. Below is an example snippet that summarizes fast stream of elements to a standard deviation, mean and count of elements that have arrived while the stats have been calculated. + ```csharp var statsFlow = Flow.Create() .ConflateWithSeed(_ => ImmutableList.Create(_), (agg, acc) => agg.Add(acc)) @@ -140,6 +163,7 @@ Another possible use of `conflate` is to not consider all elements for summary w return agg; }).Concat(identity); ``` + ### Understanding expand Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers. Expand allows to extrapolate a value to be sent as an element to a consumer. @@ -151,10 +175,12 @@ As a simple use of expand here is a flow that sends the same element to consumer ``` Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow that tracks and reports a drift between fast consumer and slow producer. + ```csharp var driftFlow = Flow.Create() .Expand(i => Enumerable.Repeat(0, int.MaxValue).Select(n => new {i, n}).GetEnumerator()); ``` + Note that all of the elements coming from upstream will go through expand at least once. This means that the output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise. diff --git a/docs/articles/streams/error-handling.md b/docs/articles/streams/error-handling.md index 4c2a4e5487a..52979fea213 100644 --- a/docs/articles/streams/error-handling.md +++ b/docs/articles/streams/error-handling.md @@ -36,7 +36,9 @@ Source.From(Enumerable.Range(0, 6)).Select(n => }) .RunForeach(Console.WriteLine, materializer); ``` + This will output: + ``` 0 1 diff --git a/docs/articles/streams/quickstart.md b/docs/articles/streams/quickstart.md index c83c9e67e31..0f81e4aa307 100644 --- a/docs/articles/streams/quickstart.md +++ b/docs/articles/streams/quickstart.md @@ -17,11 +17,13 @@ A stream usually begins at a source, so this is also how we start an Akka Stream using Akka.Streams; using Akka.Streams.Dsl; ``` + Now we will start with a rather simple source, emitting the integers 1 to 100; ```csharp Source source = Source.From(Enumerable.Range(1,100)) ``` + The `Source` type is parametrized with two types: the first one is the type of element that this source emits and the second one may signal that running the source produces some auxiliary value (e.g. a network source may provide information about the bound port or the peer's address). Where no auxiliary information is produced, the type `NotUsed` is used -- and a simple range of integers surely falls into this category. Having created this source means that we have a description of how to emit the first 100 natural numbers, but this source is not yet active. In order to get those numbers out we have to run it: @@ -29,16 +31,20 @@ Having created this source means that we have a description of how to emit the f ```csharp source.RunForeach(i => Console.WriteLine(i.ToString()), materializer) ``` + This line will complement the source with a consumer function--in this example we simply print out the numbers to the console--and pass this little stream setup to an Actor that runs it. This activation is signaled by having "run" be part of the method name; there are other methods that run Akka Streams, and they all follow this pattern. You may wonder where the Actor gets created that runs the stream, and you are probably also asking yourself what this materializer means. In order to get this value we first need to create an Actor system: + ```csharp using (var system = ActorSystem.Create("system")) using (var materializer = system.Materializer()) ``` + There are other ways to create a materializer, e.g. from an `ActorContext` when using streams from within Actors. The `Materializer` is a factory for stream execution engines, it is the thing that makes streams run --you don't need to worry about any of the details just now apart from that you need one for calling any of the run methods on a `Source`. The nice thing about Akka Streams is that the `Source` is just a description of what you want to run, and like an architect's blueprint it can be reused, incorporated into a larger design. We may choose to transform the source of integers and write it to a file instead: + ```csharp var factorials = source.Scan(new BigInteger(1), (acc, next) => acc * next); var result = @@ -46,10 +52,12 @@ The nice thing about Akka Streams is that the `Source` is just a description of .Select(num => ByteString.FromString($"{num}\n")) .RunWith(FileIO.ToFile(new FileInfo("factorials.txt")), materializer); ``` + First we use the `scan` combinator to run a computation over the whole stream: starting with the number 1 `(BigInteger(1))` we multiple by each of the incoming numbers, one after the other; the `scan` operation emits the initial value and then every calculation result. This yields the series of factorial numbers which we stash away as a Source for later reuse --it is important to keep in mind that nothing is actually computed yet, this is just a description of what we want to have computed once we run the stream. Then we convert the resulting series of numbers into a stream of `ByteString` objects describing lines in a text file. This stream is then run by attaching a file as the receiver of the data. In the terminology of Akka Streams this is called a `Sink. IOResult` is a type that IO operations return in Akka Streams in order to tell you how many bytes or elements were consumed and whether the stream terminated normally or exceptionally. ## Reusable Pieces One of the nice parts of Akka Stream --and something that other stream libraries do not offer-- is that not only sources can be reused like blueprints, all other elements can be as well. We can take the file-writing `Sink`, prepend the processing steps necessary to get the `ByteString` elements from incoming string and package that up as a reusable piece as well. Since the language for writing these streams always flows from left to right (just like plain English), we need a starting point that is like a source but with an "open" input. In Akka Streams this is called a `Flow`: + ```csharp public static Sink> LineSink(string filename) { return Flow.Create() @@ -57,9 +65,11 @@ public static Sink> LineSink(string filename) { .ToMaterialized(FileIO.ToFile(new FileInfo(filename)), Keep.Right); } ``` + Starting from a flow of strings we convert each to `ByteString` and then feed to the already known file-writing `Sink`. The resulting blueprint is a `Sink>`, which means that it accepts strings as its input and when materialized it will create auxiliary information of type `Task` (when chaining operations on a `Source` or `Flow` the type of the auxiliary information --called the "materialized value"-- is given by the leftmost starting point; since we want to retain what the `FileIO.ToFile` sink has to offer, we need to say `Keep.Right`). We can use the new and shiny Sink we just created by attaching it to our factorials source --after a small adaptation to turn the numbers into strings: + ```csharp factorials.Select(_ => _.ToString()).RunWith(LineSink("factorial2.txt"), materializer); ``` @@ -73,6 +83,7 @@ Before we start looking at a more involved example we explore the streaming natu .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping) .RunForeach(Console.WriteLine, materializer); ``` + All operations so far have been time-independent and could have been performed in the same fashion on strict collections of elements. The next line demonstrates that we are in fact dealing with streams that can flow at a certain speed: we use the `throttle` combinator to slow down the stream to 1 element per second (the second 1 in the argument list is the maximum size of a burst that we want to allow--passing 1 means that the first element gets through immediately and the second then has to wait for one second and so on). If you run this program you will see one line printed per second. One aspect that is not immediately visible deserves mention, though: if you try and set the streams to produce a billion numbers each then you will notice that your environment does not crash with an OutOfMemoryError, even though you will also notice that running the streams happens in the background, asynchronously (this is the reason for the auxiliary information to be provided as a `Task`). The secret that makes this work is that Akka Streams implicitly implement pervasive flow control, all combinators respect back-pressure. This allows the throttle combinator to signal to all its upstream sources of data that it can only accept elements at a certain rate--when the incoming rate is higher than one per second the throttle combinator will assert back-pressure upstream. diff --git a/docs/articles/streams/workingwithstreamingio.md b/docs/articles/streams/workingwithstreamingio.md index f9351440085..1b19d3216cc 100644 --- a/docs/articles/streams/workingwithstreamingio.md +++ b/docs/articles/streams/workingwithstreamingio.md @@ -26,6 +26,7 @@ Closing connections is possible by cancelling the incoming connection `Flow` fro [!code-csharp[StreamTcpDocTests.cs](../../../src/core/Akka.Docs.Tests/Streams/StreamTcpDocTests.cs?name=close-incoming-connection)] We can then test the TCP server by sending data to the TCP Socket using `netcat` (on Windows it is possible to use Linux Subsystem for Windows): + ``` echo -n "Hello World" | netcat 127.0.0.1 8888 Hello World!!! diff --git a/docs/articles/utilities/event-bus.md b/docs/articles/utilities/event-bus.md index 161c78a6fe2..44d63735ab4 100644 --- a/docs/articles/utilities/event-bus.md +++ b/docs/articles/utilities/event-bus.md @@ -53,6 +53,7 @@ public class ExpendableActor : ReceiveActor { } ``` sample capture + ```string DeadLetter captured: another message, sender: [akka://MySystem/deadLetters], recipient: [akka://MySystem/user/ExpendableActor#1469246785] ``` diff --git a/docs/articles/utilities/logging.md b/docs/articles/utilities/logging.md index 2bef0bb34fa..e1e311f4d3c 100644 --- a/docs/articles/utilities/logging.md +++ b/docs/articles/utilities/logging.md @@ -58,11 +58,13 @@ Note that you need to modify the config as explained below. ### NLog Configuration Example NLog configuration inside your app.config or web.config: + ```hocon akka { loggers = ["Akka.Logger.NLog.NLogLogger, Akka.Logger.NLog"] } ``` + The above NLog components can be found on Nuget (https://www.nuget.org/packages/Akka.Logger.NLog/) ## Configuring Custom Loggers @@ -84,7 +86,9 @@ akka { actor.debug.unhandled = on } ``` + ## Example configuration + ```hocon akka { stdout-loglevel = DEBUG diff --git a/docs/articles/utilities/scheduler.md b/docs/articles/utilities/scheduler.md index 537a475fa8b..460fb781ec9 100644 --- a/docs/articles/utilities/scheduler.md +++ b/docs/articles/utilities/scheduler.md @@ -50,6 +50,7 @@ parameters) and then call the method when the message is received. ``` Context.System.Scheduler.ScheduleTellRepeatedly(....); ``` + > [!WARNING] > All scheduled task will be executed when the `ActorSystem` is terminated. i.e. the task may execute before its timeout. diff --git a/docs/articles/utilities/serilog.md b/docs/articles/utilities/serilog.md index baee01d51fd..72c0a960c58 100644 --- a/docs/articles/utilities/serilog.md +++ b/docs/articles/utilities/serilog.md @@ -23,6 +23,7 @@ PM> Install-Package Serilog.Sinks.Console Next, you'll need to configure the global `Log.Logger` and also specify to use the logger in the config when creating the system, for example like this: + ```csharp var logger = new LoggerConfiguration() .WriteTo.Console() @@ -37,6 +38,7 @@ var system = ActorSystem.Create("my-test-system", "akka { loglevel=INFO, logger ## Logging To log inside an actor, using the normal `string.Format()` syntax, get the logger and log: + ```csharp var log = Context.GetLogger(); ... @@ -44,11 +46,13 @@ log.Info("The value is {0}", counter); ``` Or alternatively + ```csharp var log = Context.GetLogger(); ... log.Info("The value is {Counter}", counter); ``` + ## Extensions The package __Akka.Logger.Serilog__ also includes the extension method `ForContext()` for `ILoggingAdapter` (the object returned by `Context.GetLogger()`). This is analogous to Serilog's `ForContext()` but instead of returning a Serilog `ILogger` it returns an Akka.NET `ILoggingAdapter`. This instance acts as contextual logger that will attach a property to all events logged through it. diff --git a/docs/community/building-akka-net.md b/docs/community/building-akka-net.md index fc5fee0b1a1..6fbc9e3490a 100644 --- a/docs/community/building-akka-net.md +++ b/docs/community/building-akka-net.md @@ -11,11 +11,13 @@ Akka.NET's build system is a modified version of [Petabridge's `dotnet new` temp This project supports a wide variety of commands, all of which can be listed via: **Windows** + ``` c:\> build.cmd help ``` **Linux / OS X** + ``` c:\> build.sh help ``` @@ -52,6 +54,7 @@ This option will work locally on Linux or Windows. This project will automatically populate its release notes in all of its modules via the entries written inside [`RELEASE_NOTES.md`](RELEASE_NOTES.md) and will automatically update the versions of all assemblies and NuGet packages via the metadata included inside [`common.props`](src/common.props). **RELEASE_NOTES.md** + ``` #### 0.1.0 October 05 2019 #### First release @@ -60,6 +63,7 @@ First release In this instance, the NuGet and assembly version will be `0.1.0` based on what's available at the top of the `RELEASE_NOTES.md` file. **RELEASE_NOTES.md** + ``` #### 0.1.0-beta1 October 05 2019 #### First release diff --git a/docs/community/documentation-guidelines.md b/docs/community/documentation-guidelines.md index b6e89b521a9..3bcd0343caf 100644 --- a/docs/community/documentation-guidelines.md +++ b/docs/community/documentation-guidelines.md @@ -24,6 +24,7 @@ When documenting code, please use the standard .NET convention of [XML documenta Please be mindful to including *useful* comments when documenting a class or method. *Useful* comments means including full English sentences when summarizing the code and not relying on pre-generated comments from a tool like GhostDoc. Tools like these are great in what they do *if* supplemented with well-reasoned grammar. **BAD** obviously auto-generated comment + ```csharp /// /// Class Serializer. @@ -41,6 +42,7 @@ public abstract class Serializer ``` **GOOD** clear succinct comment + ```csharp /// /// A Serializer represents a bimap between an object and an array of bytes representing that object. From a8fecaddb142453d93c220bb78b1162e2d81d6ce Mon Sep 17 00:00:00 2001 From: Sean Killeen Date: Thu, 14 Oct 2021 00:31:22 -0400 Subject: [PATCH 2/2] fixes --- docs/articles/actors/coordinated-shutdown.md | 1 + docs/articles/actors/dependency-injection.md | 1 + docs/articles/actors/finite-state-machine.md | 13 +++++++++++++ docs/articles/actors/inbox.md | 2 ++ docs/articles/actors/receive-actor-api.md | 15 +++++++++++++++ docs/articles/actors/routers.md | 19 +++++++++++++++++++ docs/articles/actors/testing-actor-systems.md | 6 ++++++ docs/articles/clustering/cluster-metrics.md | 1 + docs/articles/clustering/cluster-overview.md | 2 ++ .../distributed-publish-subscribe.md | 2 ++ .../use-case-and-deployment-scenarios.md | 3 +++ .../persistence/custom-serialization.md | 1 + .../articles/persistence/persistence-query.md | 1 + .../persistence/persistence-testing.md | 1 + docs/articles/persistence/snapshots.md | 1 + docs/articles/remoting/index.md | 2 ++ docs/articles/streams/integration.md | 1 + .../streams/pipeliningandparallelism.md | 1 + docs/articles/streams/workingwithgraphs.md | 1 + 19 files changed, 74 insertions(+) diff --git a/docs/articles/actors/coordinated-shutdown.md b/docs/articles/actors/coordinated-shutdown.md index bb3e5e08112..37c6ff41ec2 100644 --- a/docs/articles/actors/coordinated-shutdown.md +++ b/docs/articles/actors/coordinated-shutdown.md @@ -137,6 +137,7 @@ By default, when the final phase of the `CoordinatedShutdown` executes the calli ``` akka.coordinated-shutdown.terminate-actor-system = off ``` + If this setting is disabled (it is enabled b default), the `ActorSystem` will not be terminated as the final phase of the `CoordinatedShutdown` phases. `CoordinatedShutdown` phases, by default, are also executed when the `ActorSystem` is terminated. You can change this behavior by disabling this HOCON value in your configuration: diff --git a/docs/articles/actors/dependency-injection.md b/docs/articles/actors/dependency-injection.md index e208104caee..eb33a435f78 100644 --- a/docs/articles/actors/dependency-injection.md +++ b/docs/articles/actors/dependency-injection.md @@ -92,6 +92,7 @@ var system = ActorSystem.Create("MySystem"); // Create the dependency resolver for the actor system IDependencyResolver resolver = new XyzDependencyResolver(someContainer, system); ``` + When creating actorRefs straight off your ActorSystem instance, you can use the DI() Extension. ```csharp diff --git a/docs/articles/actors/finite-state-machine.md b/docs/articles/actors/finite-state-machine.md index a7c68dcafe0..177564f4e45 100644 --- a/docs/articles/actors/finite-state-machine.md +++ b/docs/articles/actors/finite-state-machine.md @@ -9,6 +9,7 @@ title: FSM To demonstrate most of the features of the `FSM` class, consider an actor which shall receive and queue messages while they arrive in a burst and send them on after the burst ended or a flush request is received. First, consider all of the below to use these import statements: + ```csharp using Akka.Actor; using Akka.Event; @@ -78,9 +79,11 @@ The `FSM` class takes two type parameters: ### Defining States A state is defined by one or more invocations of the method + ```csharp When(, [, timeout = ]). ``` + The given name must be an object which is type-compatible with the first type parameter given to the `FSM` class. This object is used as a hash key, so you must ensure that it properly implements `Equals` and `GetHashCode`; in particular it must not be mutable. The easiest fit for these requirements are case objects. If the `timeout` parameter is given, then all transitions into this state, including staying, receive this timeout by default. Initiating the transition with an explicit timeout may be used to override this default, see [Initiating Transitions](#initiating-transitions) for more information. The state timeout of any state may be changed during action processing with `SetStateTimeout(state, duration)`. This enables runtime configuration e.g. via external message. @@ -91,9 +94,11 @@ The `stateFunction` argument is a `delegate State StateFunction(E ### Defining the Initial State Each `FSM` needs a starting point, which is declared using + ```csharp StartWith(state, data[, timeout]) ``` + The optionally given timeout argument overrides any specification given for the desired initial state. If you want to cancel a default timeout, use `null`. ### Unhandled Events @@ -114,6 +119,7 @@ WhenUnhandled(state => } }); ``` + Within this handler the state of the `FSM` may be queried using the stateName method. > [!IMPORTANT] @@ -127,6 +133,7 @@ The result of any stateFunction must be a definition of the next state unless te - `Replying(msg)`. This modifier sends a reply to the currently processed message and otherwise does not modify the state transition. All modifiers can be chained to achieve a nice and concise description: + ```csharp When(State.SomeState, state => { return GoTo(new Processing()) @@ -141,9 +148,11 @@ Transitions occur "between states" conceptually, which means after any actions y #### Internal Monitoring Up to this point, the `FSM DSL` has been centered on states and events. The dual view is to describe it as a series of transitions. This is enabled by the method + ```csharp OnTransition(handler) ``` + which associates actions with a transition instead of with a state and event. The handler is a delegate `void TransitionHandler(TState initialState, TState nextState)` function which takes a pair of states as input; no resulting state is needed as it is not possible to modify the transition in progress. ```csharp @@ -193,6 +202,7 @@ Timers may be canceled using CancelTimer(name); ``` + which is guaranteed to work immediately, meaning that the scheduled message will not be processed after this call even if the timer already fired and queued it. The status of any timer may be inquired with ```csharp @@ -203,9 +213,11 @@ These named timers complement state timeouts because they are not affected by in ### Termination from Inside The `FSM` is stopped by specifying the result state as + ```csharp Stop(reason, stateData); ``` + The reason must be one of `Normal` (which is the default), `Shutdown` or `Failure(reason)`, and the second argument may be given to change the state data which is available during termination handling. > [!NOTE] @@ -242,6 +254,7 @@ OnTermination(termination => } }); ``` + As for the `WhenUnhandled` case, this handler is not stacked, so each invocation of `OnTermination` replaces the previously installed handler. ### Termination from Outside diff --git a/docs/articles/actors/inbox.md b/docs/articles/actors/inbox.md index 45419d85be5..f9d7ce5d0ea 100644 --- a/docs/articles/actors/inbox.md +++ b/docs/articles/actors/inbox.md @@ -5,6 +5,7 @@ title: Inbox # The Inbox When writing code outside of actors which shall communicate with actors, the ask pattern can be a solution (see below), but there are two things it cannot do: receiving multiple replies (e.g. by subscribing an `IActorRef` to a notification service) and watching other actors' lifecycle. For these purposes there is the `Inbox` class: + ```csharp var target = system.ActorOf(Props.Empty); var inbox = Inbox.Create(system); @@ -22,6 +23,7 @@ catch (TimeoutException) ``` The send method wraps a normal `Tell` and supplies the internal actor's reference as the sender. This allows the reply to be received on the last line. Watching an actor is quite simple as well + ```csharp using System.Diagnostics; ... diff --git a/docs/articles/actors/receive-actor-api.md b/docs/articles/actors/receive-actor-api.md index 04a3f327984..e07c2690021 100644 --- a/docs/articles/actors/receive-actor-api.md +++ b/docs/articles/actors/receive-actor-api.md @@ -13,6 +13,7 @@ The Actor Model provides a higher level of abstraction for writing concurrent an In order to use the `Receive()` method inside an actor, the actor must inherit from ReceiveActor. Inside the constructor, add a call to `Receive(Action handler)` for every type of message you want to handle: Here is an example: + ```csharp public class MyActor: ReceiveActor { @@ -32,12 +33,14 @@ public class MyActor: ReceiveActor ### Props `Props` is a configuration class to specify options for the creation of actors, think of it as an immutable and thus freely shareable recipe for creating an actor including associated deployment information (e.g. which dispatcher to use, see more below). Here are some examples of how to create a `Props` instance + ```csharp Props props1 = Props.Create(typeof(MyActor)); Props props2 = Props.Create(() => new MyActorWithArgs("arg")); Props props3 = Props.Create(); Props props4 = Props.Create(typeof(MyActorWithArgs), "arg"); ``` + The second variant shows how to pass constructor arguments to the `Actor` being created, but it should only be used outside of actors as explained below. #### Recommended Practices @@ -67,6 +70,7 @@ system.ActorOf(DemoActor.Props(42), "demo"); ``` Another good practice is to declare local messages (messages that are sent in process) within the Actor, which makes it easier to know what messages are generally being sent over the wire vs in process.: + ```csharp public class DemoActor : ReceiveActor { @@ -168,6 +172,7 @@ protected override void PostStop() { } ``` + The implementations shown above are the defaults provided by the `ReceiveActor` class. ### Actor Lifecycle @@ -255,6 +260,7 @@ Context.ActorSelection("/user/serviceA/actor"); // will look up sibling beneath same supervisor Context.ActorSelection("../joe"); ``` + > [!NOTE] > It is always preferable to communicate with other Actors using their `IActorRef` instead of relying upon `ActorSelection`. Exceptions are: sending messages using the At-Least-Once Delivery facility, initiating first contact with a remote system. In all other cases `ActorRefs` can be provided during Actor creation or initialization, passing them from parent to child or introducing Actors by sending their `ActorRefs` to other Actors within messages. @@ -269,6 +275,7 @@ Context.ActorSelection("/user/serviceB/worker*"); // will look up all siblings beneath same supervisor Context.ActorSelection("../*"); ``` + Messages can be sent via the `ActorSelection` and the path of the `ActorSelection`is looked up when delivering each message. If the selection does not match any actors the message will be dropped. To acquire an `IActorRef` for an `ActorSelection` you need to send a message to the selection and use the `Sender` reference of the reply from the actor. There is a built-in `Identify` message that all Actors will understand and automatically reply to with a `ActorIdentity` message containing the `IActorRef`. This message is handled specially by the actors which are traversed in the sense that if a concrete name lookup fails (i.e. a non-wildcard path element does not correspond to a live actor) then a negative result is generated. Please note that this does not mean that delivery of that reply is guaranteed, it still is a normal message. @@ -365,6 +372,7 @@ This is the preferred way of sending messages. No blocking waiting for a message // don’t forget to think about who is the sender (2nd argument) target.Tell(message, Self); ``` + The sender reference is passed along with the message and available within the receiving actor via its `Sender` property while processing this message. Inside of an actor it is usually `Self` who shall be the sender, but there can be cases where replies shall be routed to some other actor—e.g. the parent—in which the second argument to `Tell` would be a different one. Outside of an actor and if no reply is needed the second argument can be `null`; if a reply is needed outside of an actor you can use the ask-pattern described next. ### Ask: Send-And-Receive-Future @@ -413,6 +421,7 @@ target.Forward(result); ## Receive messages To receive a message you should create a `Receive` handler in a constructor. + ```csharp Receive(ms => Console.WriteLine("Received message: " + msg)); ``` @@ -425,6 +434,7 @@ Receive(s => Console.WriteLine("Received string: " + s)); //1 Receive(s => Console.WriteLine("Also received string: " + s)); //2 Receive(o => Console.WriteLine("Received object: " + o)); //3 ``` + > **Example**
> The actor receives a message of type string. Only the first handler is invoked, even though all three handlers can handle that message. @@ -473,6 +483,7 @@ protected override void Unhandled(object message) //Do something with the message. } ``` + Another option is to add a handler last that matches all messages, using `ReceiveAny()`. ### ReceiveAny @@ -482,6 +493,7 @@ To catch messages of any type the `ReceiveAny(Action handler)` overload Receive(s => Console.WriteLine("Received string: " + s); ReceiveAny(o => Console.WriteLine("Received object: " + o); ``` + Since it handles everything, it must be specified last. Specifying handlers it after will cause an exception. ```csharp @@ -754,6 +766,7 @@ static void Main(string[] args) The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox. Here is an example of the `IWithUnboundedStash` interface in action: + ```csharp public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash { @@ -851,6 +864,7 @@ protected override void PreRestart(Exception reason, object message) PostStop(); } ``` + Please note, that the child actors are *still restarted*, but no new `IActorRef` is created. One can recursively apply the same principles for the children, ensuring that their `PreStart()` method is called only at the creation of their refs. For more information see [What Restarting Means](xref:supervision#what-restarting-means). @@ -881,6 +895,7 @@ public class Service : ReceiveActor } } ``` + If the actor may receive messages before it has been initialized, a useful tool can be the `Stash` to save messages until the initialization finishes, and replaying them after the actor became initialized. > [!WARNING] diff --git a/docs/articles/actors/routers.md b/docs/articles/actors/routers.md index 5c8b4324bd3..02171bc6ea7 100644 --- a/docs/articles/actors/routers.md +++ b/docs/articles/actors/routers.md @@ -49,6 +49,7 @@ akka.actor.deployment { } } ``` + ```cs var props = Props.Create().WithRouter(FromConfig.Instance); var actor = system.ActorOf(props, "workers"); @@ -64,6 +65,7 @@ akka.actor.deployment { } } ``` + ```cs var props = Props.Create().WithRouter(FromConfig.Instance); var actor = system.ActorOf(props, "workers"); @@ -118,6 +120,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Create().WithRouter(FromConfig.Instance), "some-pool"); ``` @@ -138,6 +141,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "some-group"); ``` @@ -167,6 +171,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Create().WithRouter(FromConfig.Instance), "some-pool"); ``` @@ -187,6 +192,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "some-group"); ``` @@ -214,6 +220,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Create().WithRouter(FromConfig.Instance), "some-pool"); ``` @@ -234,6 +241,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "some-group"); ``` @@ -266,6 +274,7 @@ By using a `ConsistentHash` router we can now process multiple commands in paral There are 3 ways to define what data to use for the consistent hash key. 1. You can define a *hash mapping delegate* using the `WithHashMapper` method of the router to map incoming messages to their consistent hash key. This makes the decision transparent for the sender. + ```cs new ConsistentHashingPool(5).WithHashMapping(o => { @@ -277,6 +286,7 @@ There are 3 ways to define what data to use for the consistent hash key. ``` 2. The messages may implement `IConsistentHashable`. The key is part of the message and it's convenient to define it together with the message definition. + ```cs public class SomeMessage : IConsistentHashable { @@ -286,6 +296,7 @@ There are 3 ways to define what data to use for the consistent hash key. ``` 3. The messages can be wrapped in a `ConsistentHashableEnvelope` to define what data to use for the consistent hash key. The sender knows the key to use. + ```cs public class SomeMessage { @@ -311,6 +322,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Create().WithRouter(FromConfig.Instance), "some-pool"); ``` @@ -332,6 +344,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "some-group"); ``` @@ -369,6 +382,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Create().WithRouter(FromConfig.Instance), "some-pool"); ``` @@ -393,6 +407,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "some-group"); ``` @@ -431,6 +446,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Create().WithRouter(FromConfig.Instance), "some-pool"); ``` @@ -453,6 +469,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "some-group"); ``` @@ -490,6 +507,7 @@ akka.actor.deployment { } } ``` + ```cs var router = system.ActorOf(Props.Create().WithRouter(FromConfig.Instance), "some-pool"); ``` @@ -524,6 +542,7 @@ You can also set a resizer in code when creating a router. ```cs new RoundRobinPool(5, new DefaultResizer(1, 10)) ``` + These are settings you usually change in the resizer: * `enabled` - Turns on or off the resizer. The default is `off`. diff --git a/docs/articles/actors/testing-actor-systems.md b/docs/articles/actors/testing-actor-systems.md index 0c6eac731d9..3b07926fe08 100644 --- a/docs/articles/actors/testing-actor-systems.md +++ b/docs/articles/actors/testing-actor-systems.md @@ -216,6 +216,7 @@ The `CallingThreadDispatcher` serves good purposes in unit testing, as described ### How to use it Just set the dispatcher as you normally would + ```csharp Sys.ActorOf(Props.Create().WithDispatcher(CallingThreadDispatcher.Id)); ``` @@ -244,6 +245,7 @@ The testing facilities described up to this point were aiming at formulating ass - Logging of the actor lifecycle. Actor creation, start, restart, monitor start, monitor stop and stop may be traced by enabling the setting *akka.actor.debug.lifecycle*; this, too, is enabled uniformly on all actors. All these messages are logged at `DEBUG` level. To summarize, you can enable full logging of actor activities using this configuration fragment: + ```hocon akka { loglevel = "DEBUG" @@ -309,6 +311,7 @@ All methods shown above directly access the FSM state without any synchronizatio ##Testing the Actor's behavior When the dispatcher invokes the processing behavior of an actor on a message, it actually calls apply on the current behavior registered for the actor. This starts out with the return value of the declared receive method, but it may also be changed using become and unbecome in response to external messages. All of this contributes to the overall actor behavior and it does not lend itself to easy testing on the `Actor` itself. Therefore the TestActorRef offers a different mode of operation to complement the `Actor` testing: it supports all operations also valid on normal `IActorRef`. Messages sent to the actor are processed synchronously on the current thread and answers may be sent back as usual. This trick is made possible by the `CallingThreadDispatcher` described below; this dispatcher is set implicitly for any actor instantiated into a `TestActorRef`. + ```csharp var props = Props.Create(); var myTestActor = new TestActorRef(Sys, props, null, "testB"); @@ -316,12 +319,14 @@ Task future = myTestActor.Ask("say42", TimeSpan.FromMilliseconds(3000) Assert.True(future.IsCompleted); Assert.Equal(42, await future); ``` + As the `TestActorRef` is a subclass of `LocalActorRef` with a few special extras, also aspects like supervision and restarting work properly, but beware that execution is only strictly synchronous as long as all actors involved use the `CallingThreadDispatcher`. As soon as you add elements which include more sophisticated scheduling you leave the realm of unit testing as you then need to think about asynchronicity again (in most cases the problem will be to wait until the desired effect had a chance to happen). One more special aspect which is overridden for single-threaded tests is the `ReceiveTimeout`, as including that would entail asynchronous queuing of `ReceiveTimeout` messages, violating the synchronous contract. ### The Way In-Between: Expecting Exceptions If you want to test the actor behavior, including hot-swapping, but without involving a dispatcher and without having the `TestActorRef` swallow any thrown exceptions, then there is another mode available for you: just use the receive method on `TestActorRef`, which will be forwarded to the underlying actor: + ```csharp var props = Props.Create(); var myTestActor = new TestActorRef(Sys, props, null, "testB"); @@ -342,6 +347,7 @@ However DeadLetter messages and Exceptions ultimately also result in a `LogEvent These are all things that can be intercepted, and asserted upon using the `EventFilter`. An example of how you can get a reference to the `EventFilter` + ```csharp var filter = CreateEventFilter(Sys); diff --git a/docs/articles/clustering/cluster-metrics.md b/docs/articles/clustering/cluster-metrics.md index d018a989145..ec27e6ef92a 100644 --- a/docs/articles/clustering/cluster-metrics.md +++ b/docs/articles/clustering/cluster-metrics.md @@ -48,6 +48,7 @@ The payload of the `Akka.Cluster.Metrics.Events.ClusterMetricsChanged` event wil other cluster member nodes metrics gossip which was received during the collector sample interval. You can subscribe your metrics listener actors to these events in order to implement custom node lifecycle: + ```c# ClusterMetrics.Get(Sys).Subscribe(metricsListenerActor); ``` diff --git a/docs/articles/clustering/cluster-overview.md b/docs/articles/clustering/cluster-overview.md index e0118de6d3a..81a9ca3f77f 100644 --- a/docs/articles/clustering/cluster-overview.md +++ b/docs/articles/clustering/cluster-overview.md @@ -82,6 +82,7 @@ Once you've installed Akka.Cluster, we need to update our HOCON configuration to #### Seed Node Configuration + ```xml akka { actor.provider = cluster @@ -105,6 +106,7 @@ You can, and should, specify multiple seed nodes inside this field - and seed no > If you're using dedicated seed nodes (such as [Lighthouse](https://github.com/petabridge/lighthouse)), you should run at least 2 or 3. If you only have one seed node and that machine crashes, the cluster will continue operating but no new members can join the cluster! #### Non-Seed Node Configuration + ```xml akka { actor.provider = cluster diff --git a/docs/articles/clustering/distributed-publish-subscribe.md b/docs/articles/clustering/distributed-publish-subscribe.md index adbd6efc73e..384ba80a89e 100644 --- a/docs/articles/clustering/distributed-publish-subscribe.md +++ b/docs/articles/clustering/distributed-publish-subscribe.md @@ -185,7 +185,9 @@ public class Sender : ReceiveActor } } ``` + It can send messages to the path from anywhere in the cluster: + ```csharp RunOn(() => { diff --git a/docs/articles/intro/use-case-and-deployment-scenarios.md b/docs/articles/intro/use-case-and-deployment-scenarios.md index 1c54bb9afe5..9842f48b29a 100644 --- a/docs/articles/intro/use-case-and-deployment-scenarios.md +++ b/docs/articles/intro/use-case-and-deployment-scenarios.md @@ -73,6 +73,7 @@ Typically you use a very lightweight `ActorSystem` inside ASP.NET applications, ### Interaction between Controllers and Akka.NET In the sample below, we use an Web API Controller: + ```csharp public class SomeController : ApiController { @@ -95,6 +96,7 @@ to build your Windows Services. It radically simplifies hosting Windows Services The quickest way to get started with TopShelf is by creating a Console Application. Which would look like this: #### Program.cs + ```csharp using Akka.Actor; using Topshelf; @@ -159,6 +161,7 @@ The quickest way to get started with Akka.Net is to create a simple Worker Role user-actor in the RunAsync() method, as follows: #### WorkerRole.cs + ```csharp using Akka.Actor; diff --git a/docs/articles/persistence/custom-serialization.md b/docs/articles/persistence/custom-serialization.md index ef461a72597..5ec39221d0b 100644 --- a/docs/articles/persistence/custom-serialization.md +++ b/docs/articles/persistence/custom-serialization.md @@ -22,4 +22,5 @@ akka.actor { } } ``` + to the application configuration. If not specified, a default serializer is used. diff --git a/docs/articles/persistence/persistence-query.md b/docs/articles/persistence/persistence-query.md index 9dda2ae1dbe..9e124a177e5 100644 --- a/docs/articles/persistence/persistence-query.md +++ b/docs/articles/persistence/persistence-query.md @@ -341,6 +341,7 @@ public class TheOneWhoWritesToQueryJournal(id: String) : ActorBase } } ``` + ## Configuration Configuration settings can be defined in the configuration section with the absolute path corresponding to the identifier, which is `Akka.Persistence.Query.Journal.Sqlite` for the default `SqlReadJournal.Identifier`. diff --git a/docs/articles/persistence/persistence-testing.md b/docs/articles/persistence/persistence-testing.md index 49b53e3d718..244a4e057c5 100644 --- a/docs/articles/persistence/persistence-testing.md +++ b/docs/articles/persistence/persistence-testing.md @@ -167,6 +167,7 @@ public interface ISnapshotStoreInterceptor This is a specialized test kit with a pre-configured persistence plugin that uses `TestJournal` and `TestSnapshotStore` by default. This class provides the following methods to control journal behavior: `WithJournalRecovery` and `WithJournalWrite`; to control snapshot store it provides `WithSnapshotSave`, `WithSnapshotLoad` and `WithSnapshotDelete` methods; Usage example: + ``` csharp public class PersistentActorSpec : PersistenceTestKit { diff --git a/docs/articles/persistence/snapshots.md b/docs/articles/persistence/snapshots.md index 5ab9769ee16..9ceac5ab84f 100644 --- a/docs/articles/persistence/snapshots.md +++ b/docs/articles/persistence/snapshots.md @@ -26,6 +26,7 @@ else // event } ``` + The replayed messages that follow the `SnapshotOffer` message, if any, are younger than the offered snapshot. They finally recover the persistent actor to its current (i.e. latest) state. In general, a persistent actor is only offered a snapshot if that persistent actor has previously saved one or more snapshots and at least one of these snapshots matches the `SnapshotSelectionCriteria` that can be specified for recovery. diff --git a/docs/articles/remoting/index.md b/docs/articles/remoting/index.md index bda00d49d60..676681146c9 100644 --- a/docs/articles/remoting/index.md +++ b/docs/articles/remoting/index.md @@ -114,6 +114,7 @@ So in the case of our previous example, `localhost:8080` is the inbound (listeni So imagine we have the following two actor systems configured to both use the `dot-netty.tcp` Akka.Remote transport: **Client** + ```xml akka { actor { @@ -129,6 +130,7 @@ akka { ``` **Server** + ```xml akka { actor { diff --git a/docs/articles/streams/integration.md b/docs/articles/streams/integration.md index 1107945ec9d..d81de7f3fe5 100644 --- a/docs/articles/streams/integration.md +++ b/docs/articles/streams/integration.md @@ -50,6 +50,7 @@ public class Translator : ReceiveActor } } ``` + The stream can be completed with failure by sending `Akka.Actor.Status.Failure` as reply from the actor. If the `Ask` fails due to timeout the stream will be completed with `TimeoutException` failure. diff --git a/docs/articles/streams/pipeliningandparallelism.md b/docs/articles/streams/pipeliningandparallelism.md index 24abdb67033..922bd57c199 100644 --- a/docs/articles/streams/pipeliningandparallelism.md +++ b/docs/articles/streams/pipeliningandparallelism.md @@ -84,6 +84,7 @@ var pancakeChef = Flow.FromGraph(GraphDsl.Create(b => return new FlowShape(dispatchBatter.In, mergePancakes.Out); })); ``` + The benefit of parallelizing is that it is easy to scale. In the pancake example it is easy to add a third frying pan with Chris' method, but Bartosz cannot add a third frying pan, since that would require a third processing step, which is not practically possible in the case of frying pancakes. diff --git a/docs/articles/streams/workingwithgraphs.md b/docs/articles/streams/workingwithgraphs.md index 9aed33e0dfb..abbe1d14ab8 100644 --- a/docs/articles/streams/workingwithgraphs.md +++ b/docs/articles/streams/workingwithgraphs.md @@ -840,6 +840,7 @@ arc that injects a single element using ``Source.Single``. > [!WARNING] > We have to add an Async call after creating the instance of Concat. Otherwise Concat will wait upstream to be empty and that will never happen in this case. + ```csharp RunnableGraph.FromGraph(GraphDsl.Create(b => {