Skip to content

Commit

Permalink
Akka.Cluster.Sharding AK2001 cleanup (#7049)
Browse files Browse the repository at this point in the history
* upgraded to Akka.Analyzers v0.2.1

* AK2001 implementation

Working on ensuring that AK2001's rules are followed inside Akka.NET's own test suites

* fixed typo in AK2001 explainer

* cleaned up more `AK2001` usages

* added API approvals

* marked all `ClusterSharding` methods accepting old delegates as obsolete
  • Loading branch information
Aaronontheweb authored Jan 9, 2024
1 parent 3285197 commit 38ad362
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 112 deletions.
2 changes: 1 addition & 1 deletion docs/articles/debugging/rules/AK2001.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ uid: AK2001
title: Akka.Analyzers Rule AK2001 - "Do not use automatically handled messages in inside `Akka.Cluster.Sharding.IMessageExtractor`s."
---

# AK2000 - Warning
# AK2001 - Warning

Do not use automatically handled messages in inside [`Akka.Cluster.Sharding.IMessageExtractor`](xref:Akka.Cluster.Sharding.IMessageExtractor)s.

Expand Down
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ If you want to see the [full set of changes made in Akka.NET v1.5.14, click here
| 1 | 1 | 1 | szaliszali |</PackageReleaseNotes>
</PropertyGroup>
<ItemGroup Label="Analyzers">
<PackageReference Include="Akka.Analyzers" Version="0.2.0" PrivateAssets="all" />
<PackageReference Include="Akka.Analyzers" Version="0.2.1" PrivateAssets="all" />
</ItemGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ public void RouteShardEnvelope()
{
_extractor.EntityId(_m1);
_extractor.EntityMessage(_m1);
_extractor.ShardId(_m1);
_extractor.ShardId(_m1.EntityId);
}

[Benchmark]
public void RouteTypedMessage()
{
_extractor.EntityId(_m2);
_extractor.EntityMessage(_m2);
_extractor.ShardId(_m2);
_extractor.ShardId(_m2.EntityId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Sharding;
Expand Down Expand Up @@ -41,7 +40,7 @@ public ShardedEntityActor()
Sender.Tell(new ResolveResp(e.EntityId, Cluster.Get(Context.System).SelfAddress));
});

ReceiveAny(_ => Sender.Tell(_));
ReceiveAny(o => Sender.Tell(o));
}
}

Expand All @@ -52,10 +51,6 @@ public sealed class ShardedProxyEntityActor : ReceiveActor, IWithUnboundedStash
private IActorRef _shardRegion;
private IActorRef _sender;





public ShardedProxyEntityActor(IActorRef shardRegion)
{
_shardRegion = shardRegion;
Expand Down Expand Up @@ -200,11 +195,6 @@ public override string EntityId(object message)
return sharded.EntityId;
}

if (message is ShardingEnvelope e)
{
return e.EntityId;
}

return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,12 @@ public async Task ReliableDelivery_with_sharding_and_durable_queue_must_load_ini
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
// ShardingEnvelope is what is used by Akka.Cluster.Sharding.Delivery, and that msg is
// automatically handled by the ShardRegion, so we don't need to explicitly handle it here
if(o is string str)
return str;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));
}, o => o));
// </SpawnDurableConsumer>

// <SpawnDurableProducer>
Expand Down Expand Up @@ -174,17 +171,7 @@ public async Task ReliableDelivery_with_sharding_and_durable_queue_must_reply_to
Props.Create(() => new Consumer(c, consumerProbe)),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));
o => string.Empty, o => o));

var durableQueueProps = EventSourcedProducerQueue.Create<Job>(ProducerId, Sys);
var shardingProducerController =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage(
PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));
o => string.Empty, o => o));

var producerController =
Sys.ActorOf(
Expand All @@ -101,17 +91,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_
PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));
o => string.Empty, o => o));

var shardingController1 =
Sys.ActorOf(
Expand Down Expand Up @@ -149,17 +129,7 @@ public async Task ReliableDelivery_with_Sharding_must_reply_to_MessageWithConfir
PropsFor(DefaultConsumerDelay, 3, consumerEndProbe.Ref, c),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));
o => string.Empty, o => o));

var producerController =
Sys.ActorOf(
Expand Down Expand Up @@ -342,17 +312,7 @@ public async Task
Props.Create(() => new ProbeWrapper(consumerProbes[consumerIncarnation.GetAndIncrement()], c)),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));
o => string.Empty, o => o));

var shardingProducerSettings = ShardingProducerController.Settings.Create(Sys) with
{
Expand Down Expand Up @@ -435,17 +395,7 @@ public async Task
Props.Create(() => new ProbeWrapper(consumerEndProbe, c)),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));
o => string.Empty, o => o));

var shardingProducerSettings = ShardingProducerController.Settings.Create(Sys) with
{
Expand Down Expand Up @@ -512,17 +462,7 @@ public async Task
Props.Create(() => new ProbeWrapper(consumerEndProbe, c)),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));
o => string.Empty, o => o));

var shardingProducerController1 =
Sys.ActorOf(
Expand Down
26 changes: 18 additions & 8 deletions src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ public Implementation(int maxNumberOfShards, Func<Msg, string> entityIdExtractor
_messageExtractor = messageExtractor;
}

public override string EntityId(Msg message)
public override string? EntityId(Msg message)
=> _entityIdExtractor.Invoke(message);

public override Msg EntityMessage(Msg message)
public override Msg? EntityMessage(Msg message)
=> _messageExtractor?.Invoke(message) ?? base.EntityMessage(message);
}

Expand Down Expand Up @@ -155,15 +155,15 @@ protected HashCodeMessageExtractor(int maxNumberOfShards)
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public abstract string EntityId(Msg message);
public abstract string? EntityId(Msg message);

/// <summary>
/// Default implementation pass on the message as is.
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public virtual Msg EntityMessage(Msg message)
public virtual Msg? EntityMessage(Msg message)
{
return message;
}
Expand All @@ -175,15 +175,15 @@ public virtual Msg EntityMessage(Msg message)
/// <returns>TBD</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[Obsolete("Use ShardId(string, object?) instead")]
public virtual string ShardId(Msg message)
public virtual string? ShardId(Msg message)
{
EntityId id;
EntityId? id;
if (message is ShardRegion.StartEntity se)
id = se.EntityId;
else
id = EntityId(message);

return _cachedIds[(Math.Abs(MurmurHash.StringHash(id)) % MaxNumberOfShards)];
return id is null ? null : _cachedIds[(Math.Abs(MurmurHash.StringHash(id)) % MaxNumberOfShards)];
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -413,6 +413,7 @@ public static Config DefaultConfig()
/// This exception is thrown when the cluster member doesn't have the role specified in <paramref name="settings"/>.
/// </exception>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public IActorRef Start(
string typeName,
Props entityProps,
Expand Down Expand Up @@ -467,6 +468,7 @@ public IActorRef Start(
/// This exception is thrown when the cluster member doesn't have the role specified in <paramref name="settings"/>.
/// </exception>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public Task<IActorRef> StartAsync(
string typeName,
Props entityProps,
Expand Down Expand Up @@ -515,6 +517,7 @@ public Task<IActorRef> StartAsync(
/// This exception is thrown when the cluster member doesn't have the role specified in <paramref name="settings"/>.
/// </exception>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public IActorRef Start(
string typeName,
Props entityProps,
Expand Down Expand Up @@ -562,6 +565,7 @@ public IActorRef Start(
/// This exception is thrown when the cluster member doesn't have the role specified in <paramref name="settings"/>.
/// </exception>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public Task<IActorRef> StartAsync(
string typeName,
Props entityProps,
Expand Down Expand Up @@ -788,6 +792,7 @@ public Task<IActorRef> StartAsync(
/// This exception is thrown when the cluster member doesn't have the role specified in <paramref name="settings"/>.
/// </exception>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public IActorRef Start(
string typeName,
Func<string, Props> entityPropsFactory,
Expand Down Expand Up @@ -842,6 +847,7 @@ public IActorRef Start(
/// This exception is thrown when the cluster member doesn't have the role specified in <paramref name="settings"/>.
/// </exception>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public Task<IActorRef> StartAsync(
string typeName,
Func<string, Props> entityPropsFactory,
Expand Down Expand Up @@ -994,6 +1000,7 @@ private async Task<IActorRef> InternalStartAsync(
/// This exception is thrown when the cluster member doesn't have the role specified in <paramref name="settings"/>.
/// </exception>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public IActorRef Start(
string typeName,
Func<string, Props> entityPropsFactory,
Expand Down Expand Up @@ -1041,6 +1048,7 @@ public IActorRef Start(
/// This exception is thrown when the cluster member doesn't have the role specified in <paramref name="settings"/>.
/// </exception>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public Task<IActorRef> StartAsync(
string typeName,
Func<string, Props> entityPropsFactory,
Expand Down Expand Up @@ -1254,6 +1262,7 @@ public Task<IActorRef> StartAsync(
/// that passed the `extractEntityId` will be used
/// </param>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public IActorRef StartProxy(
string typeName,
string role,
Expand Down Expand Up @@ -1290,6 +1299,7 @@ public IActorRef StartProxy(
/// that passed the `extractEntityId` will be used
/// </param>
/// <returns>The actor ref of the <see cref="Sharding.ShardRegion"/> that is to be responsible for the shard.</returns>
[Obsolete("Use one of the overloads that accepts an IMessageExtractor instead")]
public Task<IActorRef> StartProxyAsync(string typeName, string role, ExtractEntityId extractEntityId, ExtractShardId extractShardId)
{
return StartProxyAsync(
Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.DistributedData/Replicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ namespace Akka.DistributedData
///
/// In the <see cref="Update"/> message you can pass an optional request context, which the <see cref="Replicator"/>
/// does not care about, but is included in the reply messages. This is a convenient
/// way to pass contextual information (e.g. original sender) without having to use <see cref="Ask"/>
/// way to pass contextual information (e.g. original sender) without having to use Ask{T}
/// or local correlation data structures.
/// </para>
/// <para>
Expand Down
Loading

0 comments on commit 38ad362

Please sign in to comment.