From 19d39086ad140649d7b28d7afc0240325780f4f5 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 10 Jan 2025 03:00:13 +0700 Subject: [PATCH] Add `Broadcast` message support to `ShardedDaemonProcess` --- .../ShardedDaemonProcess.cs | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs index 9eb7e33b578..1e3fdc179a6 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs @@ -112,12 +112,23 @@ public DaemonMessageRouter(string[] entityIds, IActorRef shardingRef) protected override void OnReceive(object message) { - var nextId = _entityIds[_index % _entityIds.Length]; + if (message is Broadcast broadcast) + { + var unwrapped = broadcast.Message; + foreach (var entityId in _entityIds) + { + _shardingRef.Forward(new ShardingEnvelope(entityId, unwrapped)); + } + } + else + { + var nextId = _entityIds[_index % _entityIds.Length]; - // have to remember to always allow the sharding envelope to be forwarded - _shardingRef.Forward(new ShardingEnvelope(nextId, message)); - if (_index == int.MaxValue) _index = 0; - else _index++; + // have to remember to always allow the sharding envelope to be forwarded + _shardingRef.Forward(new ShardingEnvelope(nextId, message)); + if (_index == int.MaxValue) _index = 0; + else _index++; + } } }