Skip to content

Commit

Permalink
Log TypeName in ShardRegion logs (#4562)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
ismaelhamed and Aaronontheweb authored Sep 9, 2020
1 parent f52eff4 commit 362a446
Showing 1 changed file with 45 additions and 37 deletions.
82 changes: 45 additions & 37 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ protected override void PostStop()
private void LogPassivateIdleEntities()
{
if (Settings.ShouldPassivateIdleEntities)
Log.Info($"{TypeName}: Idle entities will be passivated after [{Settings.PassivateIdleEntityAfter}]");
Log.Info("{0}: Idle entities will be passivated after [{1}]",
TypeName,
Settings.PassivateIdleEntityAfter);

if (Settings.RememberEntities)
Log.Debug("Idle entities will not be passivated because 'rememberEntities' is enabled.");
Expand All @@ -498,7 +500,8 @@ private void ChangeMembers(IImmutableSet<Member> newMembers)
if (!Equals(before, after))
{
if (Log.IsDebugEnabled)
Log.Debug("Coordinator moved from [{0}] to [{1}]",
Log.Debug("{0}: Coordinator moved from [{1}] to [{2}]",
TypeName,
before?.Address.ToString() ?? string.Empty,
after?.Address.ToString() ?? string.Empty);

Expand Down Expand Up @@ -544,14 +547,14 @@ protected override bool Receive(object message)
DeliverMessage(message, Sender);
return true;
default:
Log.Warning("Message does not have an extractor defined in shard [{0}] so it was ignored: {1}", TypeName, message);
Log.Warning("{0}: Message does not have an extractor defined in shard so it was ignored: {1}", TypeName, message);
return false;
}
}

private void InitializeShard(ShardId id, IActorRef shardRef)
{
Log.Debug("Shard was initialized [{0}]", id);
Log.Debug("{0}: Shard was initialized [{1}]", TypeName, id);
StartingShards = StartingShards.Remove(id);
DeliverBufferedMessage(id, shardRef);
}
Expand All @@ -566,21 +569,26 @@ private void Register()
{
if (actorSelections.Count > 0)
{
var coordinatorMessage = Cluster.State.Unreachable.Contains(MembersByAge.First()) ? $"Coordinator [{MembersByAge.First()}] is unreachable." : $"Coordinator [{MembersByAge.First()}] is reachable.";

Log.Warning("Trying to register to coordinator at [{0}], but no acknowledgement. Total [{1}] buffered messages. [{2}]",
string.Join(", ", actorSelections.Select(i => i.PathString)), TotalBufferSize, coordinatorMessage);
var coordinatorMessage = Cluster.State.Unreachable.Contains(MembersByAge.First())
? $"Coordinator [{MembersByAge.First()}] is unreachable."
: $"Coordinator [{MembersByAge.First()}] is reachable.";

Log.Warning("{0}: Trying to register to coordinator at [{1}], but no acknowledgement. Total [{2}] buffered messages. [{3}]",
TypeName,
string.Join(", ", actorSelections.Select(i => i.PathString)),
TotalBufferSize,
coordinatorMessage);
}
else
{
// Members start off as "Removed"
var partOfCluster = Cluster.SelfMember.Status != MemberStatus.Removed;
var possibleReason = partOfCluster ?
"Has Cluster Sharding been started on every node and nodes been configured with the correct role(s)?" :
"Probably, no seed-nodes configured and manual cluster join not performed?";
var possibleReason = partOfCluster
? "Has Cluster Sharding been started on every node and nodes been configured with the correct role(s)?"
: "Probably, no seed-nodes configured and manual cluster join not performed?";

Log.Warning("No coordinator found to register. {0} Total [{1}] buffered messages.",
possibleReason, TotalBufferSize);
Log.Warning("{0}: No coordinator found to register. {1} Total [{2}] buffered messages.",
TypeName, possibleReason, TotalBufferSize);
}
}
}
Expand All @@ -594,7 +602,7 @@ private void DeliverStartEntity(object message, IActorRef sender)
catch (Exception ex)
{
//case ex: MatchError ⇒
Log.Error(ex, "When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).");
Log.Error(ex, "{0}: When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).", TypeName);
}
}

Expand All @@ -613,11 +621,11 @@ private void DeliverMessage(object message, IActorRef sender)
if (!ShardBuffers.TryGetValue(shardId, out var buffer))
{
buffer = ImmutableList<KeyValuePair<object, IActorRef>>.Empty;
Log.Debug("Request shard [{0}]", shardId);
Log.Debug("{0}: Request shard [{1}] home. Coordinator [{2}]", TypeName, shardId, _coordinator);
_coordinator?.Tell(new PersistentShardCoordinator.GetShardHome(shardId));
}

Log.Debug("Buffer message for shard [{0}]. Total [{1}] buffered messages.", shardId, buffer.Count + 1);
Log.Debug("{0}: Buffer message for shard [{1}]. Total [{2}] buffered messages.", TypeName, shardId, buffer.Count + 1);
ShardBuffers = ShardBuffers.SetItem(shardId, buffer.Add(new KeyValuePair<object, IActorRef>(message, sender)));
}
}
Expand Down Expand Up @@ -645,22 +653,22 @@ private void DeliverMessage(object message, IActorRef sender)
}
else
{
Log.Debug("Forwarding request for shard [{0}] to [{1}]", shardId, region);
Log.Debug("{0}: Forwarding request for shard [{1}] to [{2}]", TypeName, shardId, region);
region.Tell(message, sender);
}
}
else
{
if (string.IsNullOrEmpty(shardId))
{
Log.Warning("Shard must not be empty, dropping message [{0}]", message.GetType());
Log.Warning("{0}: Shard must not be empty, dropping message [{1}]", TypeName, message.GetType());
Context.System.DeadLetters.Tell(message);
}
else
{
if (!ShardBuffers.ContainsKey(shardId))
{
Log.Debug("Request shard [{0}]", shardId);
Log.Debug("{0}: Request shard [{1}] home. Coordinator [{2}]", TypeName, shardId, _coordinator);
_coordinator?.Tell(new PersistentShardCoordinator.GetShardHome(shardId));
}

Expand All @@ -676,10 +684,10 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender)
if (totalBufferSize >= Settings.TunningParameters.BufferSize)
{
if (_loggedFullBufferWarning)
Log.Debug("Buffer is full, dropping message for shard [{0}]", shardId);
Log.Debug("{0}: Buffer is full, dropping message for shard [{1}]", TypeName, shardId);
else
{
Log.Warning("Buffer is full, dropping message for shard [{0}]", shardId);
Log.Warning("{0}: Buffer is full, dropping message for shard [{1}]", TypeName, shardId);
_loggedFullBufferWarning = true;
}

Expand All @@ -696,7 +704,7 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender)
var bufferSize = Settings.TunningParameters.BufferSize;
if (total % (bufferSize / 10) == 0)
{
const string logMsg = "ShardRegion for [{0}] is using [{1} %] of its buffer capacity.";
const string logMsg = "{0}: ShardRegion is using [{1} %] of its buffer capacity.";
if (total > bufferSize / 2)
Log.Warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.", TypeName, 100 * total / bufferSize);
else
Expand Down Expand Up @@ -725,7 +733,7 @@ private void HandleShardRegionCommand(IShardRegionCommand command)
break;

case GracefulShutdown _:
Log.Debug("Starting graceful shutdown of region and all its shards");
Log.Debug("{0}: Starting graceful shutdown of region and all its shards", TypeName);
GracefulShutdownInProgress = true;
SendGracefulShutdownToCoordinator();
TryCompleteGracefulShutdown();
Expand Down Expand Up @@ -820,7 +828,7 @@ private void HandleCoordinatorMessage(PersistentShardCoordinator.ICoordinatorMes
case PersistentShardCoordinator.HostShard hs:
{
var shard = hs.Shard;
Log.Debug("Host shard [{0}]", shard);
Log.Debug("{0}: Host shard [{1}]", TypeName, shard);
RegionByShard = RegionByShard.SetItem(shard, Self);
UpdateRegionShards(Self, shard);

Expand All @@ -831,14 +839,14 @@ private void HandleCoordinatorMessage(PersistentShardCoordinator.ICoordinatorMes
}
break;
case PersistentShardCoordinator.ShardHome home:
Log.Debug("Shard [{0}] located at [{1}]", home.Shard, home.Ref);
Log.Debug("{0}: Shard [{1}] located at [{2}]", TypeName, home.Shard, home.Ref);

if (RegionByShard.TryGetValue(home.Shard, out var region))
{
if (region.Equals(Self) && !home.Ref.Equals(Self))
{
// should not happen, inconsistency between ShardRegion and PersistentShardCoordinator
throw new IllegalStateException($"Unexpected change of shard [{home.Shard}] from self to [{home.Ref}]");
throw new IllegalStateException($"{TypeName}: Unexpected change of shard [{home.Shard}] from self to [{home.Ref}]");
}
}

Expand All @@ -865,7 +873,7 @@ private void HandleCoordinatorMessage(PersistentShardCoordinator.ICoordinatorMes
case PersistentShardCoordinator.BeginHandOff bho:
{
var shard = bho.Shard;
Log.Debug("Begin hand off shard [{0}]", shard);
Log.Debug("{0}: BeginHandOff shard [{1}]", TypeName, shard);
if (RegionByShard.TryGetValue(shard, out var regionRef))
{
if (!Regions.TryGetValue(regionRef, out var updatedShards))
Expand All @@ -886,7 +894,7 @@ private void HandleCoordinatorMessage(PersistentShardCoordinator.ICoordinatorMes
case PersistentShardCoordinator.HandOff ho:
{
var shard = ho.Shard;
Log.Debug("Hand off shard [{0}]", shard);
Log.Debug("{0}: HandOff shard [{1}]", TypeName, shard);

// must drop requests that came in between the BeginHandOff and now,
// because they might be forwarded from other regions and there
Expand Down Expand Up @@ -923,11 +931,11 @@ private void RequestShardBufferHomes()
{
foreach (var buffer in ShardBuffers)
{
var logMsg = "Retry request for shard [{0}] homes from coordinator at [{1}]. [{2}] buffered messages.";
const string logMsg = "{0}: Retry request for shard [{1}] homes from coordinator at [{2}]. [{3}] buffered messages.";
if (_retryCount >= RetryCountThreshold)
Log.Warning(logMsg, buffer.Key, _coordinator, buffer.Value.Count);
Log.Warning(logMsg, TypeName, buffer.Key, _coordinator, buffer.Value.Count);
else
Log.Debug(logMsg, buffer.Key, _coordinator, buffer.Value.Count);
Log.Debug(logMsg, TypeName, buffer.Key, _coordinator, buffer.Value.Count);

_coordinator.Tell(new PersistentShardCoordinator.GetShardHome(buffer.Key));
}
Expand All @@ -937,7 +945,7 @@ private void DeliverBufferedMessage(ShardId shardId, IActorRef receiver)
{
if (ShardBuffers.TryGetValue(shardId, out var buffer))
{
Log.Debug("Deliver [{0}] buffered messages for shard [{1}]", buffer.Count, shardId);
Log.Debug("{0}: Deliver [{1}] buffered messages for shard [{2}]", TypeName, buffer.Count, shardId);

foreach (var m in buffer)
receiver.Tell(m.Key, m.Value);
Expand All @@ -962,7 +970,7 @@ private IActorRef GetShard(ShardId id)

if (ShardsByRef.Values.All(shardId => shardId != id))
{
Log.Debug("Starting shard [{0}] in region", id);
Log.Debug("{0}: Starting shard [{1}] in region", TypeName, id);

var name = Uri.EscapeDataString(id);
var shardRef = Context.Watch(Context.ActorOf(Sharding.Shards.Props(
Expand Down Expand Up @@ -1019,9 +1027,9 @@ private void HandleClusterEvent(ClusterEvent.IClusterDomainEvent e)
case ClusterEvent.MemberDowned md:
if (md.Member.UniqueAddress == Cluster.SelfUniqueAddress)
{
Log.Info("{0}: Self downed, stopping ShardRegion [{1}]", TypeName, Self.Path);
Context.Stop(Self);
}
Log.Info("Self downed, stopping ShardRegion [{0}]", Self.Path);
break;
case ClusterEvent.IMemberEvent _:
// these are expected, no need to warn about them
Expand Down Expand Up @@ -1051,7 +1059,7 @@ private void HandleTerminated(Terminated terminated)
Regions = Regions.Remove(terminated.ActorRef);

if (Log.IsDebugEnabled)
Log.Debug("Region [{0}] with shards [{1}] terminated", terminated.ActorRef, string.Join(", ", shards));
Log.Debug("{0}: Region [{1}] with shards [{2}] terminated", TypeName, terminated.ActorRef, string.Join(", ", shards));
}
else if (ShardsByRef.TryGetValue(terminated.ActorRef, out var shard))
{
Expand All @@ -1061,12 +1069,12 @@ private void HandleTerminated(Terminated terminated)
if (HandingOff.Contains(terminated.ActorRef))
{
HandingOff = HandingOff.Remove(terminated.ActorRef);
Log.Debug("Shard [{0}] handoff complete", shard);
Log.Debug("{0}: Shard [{1}] handoff complete", TypeName, shard);
}
else
{
// if persist fails it will stop
Log.Debug("Shard [{0}] terminated while not being handed off", shard);
Log.Debug("{0}: Shard [{1}] terminated while not being handed off", TypeName, shard);
if (Settings.RememberEntities)
Context.System.Scheduler.ScheduleTellOnce(Settings.TunningParameters.ShardFailureBackoff, Self, new RestartShard(shard), Self);
}
Expand Down

0 comments on commit 362a446

Please sign in to comment.