Skip to content

Commit

Permalink
cherry-picked from 231c8c9 (#6107)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb authored Sep 23, 2022
1 parent b0529e9 commit 8fcaaa4
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 13 deletions.
20 changes: 13 additions & 7 deletions docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,6 @@ Possible reasons for disabling remember entity storage are:

For supporting remembered entities in an environment without disk storage but with access to a database, use persistence mode instead.

> [!NOTE]
> Currently, Lightning.NET library, the storage solution used to store DData in disk, is having problem
> deploying native library files in [Linux operating system operating in x64 and ARM platforms]
> (<https://github.com/CoreyKaylor/Lightning.NET/issues/141>).
>
> You will need to install LightningDB in your Linux distribution manually if you wanted to use the durable DData feature.
### Terminating Remembered Entities

One complication that `akka.cluster.sharding.remember-entities = true` introduces is that your sharded entity actors can no longer be terminated through the normal Akka.NET channels, i.e. `Context.Stop(Self)`, `PoisonPill.Instance`, and the like. This is because as part of the `remember-entities` contract - the sharding system is going to insist on keeping all remembered entities alive until explicitly told to stop.
Expand Down Expand Up @@ -217,6 +210,19 @@ You can inspect current sharding stats by using following messages:
* On `GetShardRegionState` shard region will reply with `ShardRegionState` containing data about shards living in the current actor system and what entities are alive on each one of them.
* On `GetClusterShardingStats` shard region will reply with `ClusterShardingStats` having information about shards living in the whole cluster and how many entities alive in each one of them.

### Querying for the Location of Specific Entities

It's possible to query a `ShardRegion` or a `ShardRegionProxy` using a `GetEntityLocation` query:

[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationQuery)]

A `GetEntityLocation` query will always return an `EntityLocation` response - even if the query could not be executed.

> [!IMPORTANT]
> One major caveat is that in order for the `GetEntityLocation` to execute your `IMessageExtractor` or `ShardExtractor` delegate will need to support the `ShardRegion.StartEntity` message - just like you'd have to use in order to support `remember-entities=on`:
[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationExtractor)]

## Integrating Cluster Sharding with Persistent Actors

One of the most common scenarios, where cluster sharding is used, is to combine them with event-sourced persistent actors from [Akka.Persistence](xref:persistence-architecture) module.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//-----------------------------------------------------------------------
// <copyright file="ShardRegionQueriesSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.TestActors;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;

namespace Akka.Cluster.Sharding.Tests
{
public class ShardRegionQueriesSpecs : AkkaSpec
{
private readonly Cluster _cluster;
private readonly ClusterSharding _clusterSharding;
private readonly IActorRef _shardRegion;

private readonly ActorSystem _proxySys;

public ShardRegionQueriesSpecs(ITestOutputHelper outputHelper) : base(GetConfig(), outputHelper)
{
_clusterSharding = ClusterSharding.Get(Sys);
_cluster = Cluster.Get(Sys);
_shardRegion = _clusterSharding.Start("entity", s => EchoActor.Props(this, true),
ClusterShardingSettings.Create(Sys).WithRole("shard"), ExtractEntityId, ExtractShardId);

var proxySysConfig = ConfigurationFactory.ParseString("akka.cluster.roles = [proxy]")
.WithFallback(Sys.Settings.Config);
_proxySys = ActorSystem.Create(Sys.Name, proxySysConfig);

_cluster.Join(_cluster.SelfAddress);
AwaitAssert(() => { _cluster.SelfMember.Status.ShouldBe(MemberStatus.Up); });

// form a 2-node cluster
var proxyCluster = Cluster.Get(_proxySys);
proxyCluster.Join(_cluster.SelfAddress);
AwaitAssert(() => { proxyCluster.SelfMember.Status.ShouldBe(MemberStatus.Up); });
}

protected override async Task AfterAllAsync()
{
await ShutdownAsync(_proxySys);
await base.AfterAllAsync();
}

private Option<(string, object)> ExtractEntityId(object message)
{
switch (message)
{
case int i:
return (i.ToString(), message);
}

throw new NotSupportedException();
}

// <GetEntityLocationExtractor>
private string ExtractShardId(object message)
{
switch (message)
{
case int i:
return (i % 10).ToString();
// must support ShardRegion.StartEntity in order for
// GetEntityLocation to work properly
case ShardRegion.StartEntity se:
return se.EntityId;
}

throw new NotSupportedException();
}
// </GetEntityLocationExtractor>

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.loglevel = WARNING
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.cluster.roles = [shard]")
.WithFallback(Sharding.ClusterSharding.DefaultConfig())
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig());
}

/// <summary>
/// DocFx material for demonstrating how this query type works
/// </summary>
[Fact]
public async Task ShardRegion_GetEntityLocation_DocumentationSpec()
{
// <GetEntityLocationQuery>
// creates an entity with entityId="1"
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));

// determine where entity with "entityId=1" is located in cluster
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));

q1.EntityId.Should().Be("1");

// have a valid ShardId
q1.ShardId.Should().NotBeEmpty();

// have valid address for node that will / would host entity
q1.ShardRegion.Should().NotBe(Address.AllSystems); // has real address

// if entity actor is alive, will retrieve a reference to it
q1.EntityRef.HasValue.Should().BeTrue();
// </GetEntityLocationQuery>
}

[Fact(DisplayName = "ShardRegion should support GetEntityLocation queries locally")]
public async Task ShardRegion_should_support_GetEntityLocation_query_locally()
{
// arrange
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));
await _shardRegion.Ask<int>(2, TimeSpan.FromSeconds(3));

// act
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
var q2 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("2", TimeSpan.FromSeconds(1)));
var q3 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("3", TimeSpan.FromSeconds(1)));

// assert
void AssertValidEntityLocation(EntityLocation e, string entityId)
{
e.EntityId.Should().Be(entityId);
e.EntityRef.Should().NotBe(Option<IActorRef>.None);
e.ShardId.Should().NotBeNullOrEmpty();
e.ShardRegion.Should().Be(_cluster.SelfAddress);
}

AssertValidEntityLocation(q1, "1");
AssertValidEntityLocation(q2, "2");

q3.EntityRef.Should().Be(Option<IActorRef>.None);
q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard?
q3.ShardRegion.Should().Be(Address.AllSystems);
}

[Fact(DisplayName = "ShardRegion should support GetEntityLocation queries remotely")]
public async Task ShardRegion_should_support_GetEntityLocation_query_remotely()
{
// arrange
var sharding2 = ClusterSharding.Get(_proxySys);
var shardRegionProxy = await sharding2.StartProxyAsync("entity", "shard", ExtractEntityId, ExtractShardId);

await shardRegionProxy.Ask<int>(1, TimeSpan.FromSeconds(3));
await shardRegionProxy.Ask<int>(2, TimeSpan.FromSeconds(3));

// act
var q1 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
var q2 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("2", TimeSpan.FromSeconds(1)));
var q3 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("3", TimeSpan.FromSeconds(1)));

// assert
void AssertValidEntityLocation(EntityLocation e, string entityId)
{
e.EntityId.Should().Be(entityId);
e.EntityRef.Should().NotBe(Option<IActorRef>.None);
e.ShardId.Should().NotBeNullOrEmpty();
e.ShardRegion.Should().Be(_cluster.SelfAddress);
}

AssertValidEntityLocation(q1, "1");
AssertValidEntityLocation(q2, "2");

q3.EntityRef.Should().Be(Option<IActorRef>.None);
q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard?
q3.ShardRegion.Should().Be(Address.AllSystems);
}
}
}
81 changes: 81 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Akka.Pattern;
using Akka.Util;
using Akka.Util.Internal;
using Get = Akka.DistributedData.Get;

namespace Akka.Cluster.Sharding
{
Expand Down Expand Up @@ -925,12 +926,92 @@ private void HandleShardRegionQuery(IShardRegionQuery query)
case GetShardRegionStatus _:
Sender.Tell(new ShardRegionStatus(_typeName, _coordinator != null));
break;
case GetEntityLocation g:
ReplyToGetEntityLocationQuery(g, Sender);
break;
default:
Unhandled(query);
break;
}
}

private void ReplyToGetEntityLocationQuery(GetEntityLocation getEntityLocation, IActorRef sender)
{
// Get the Address of the remote IActorRef, or return our Cluster.SelfAddress is the shard / entity
// is hosted locally.
Address GetNodeAddress(IActorRef shardOrRegionRef)
{
return shardOrRegionRef.Path.Address.HasGlobalScope
? shardOrRegionRef.Path.Address
: _cluster.SelfAddress;
}

try
{
var shardId = _extractShardId(new StartEntity(getEntityLocation.EntityId));
if (string.IsNullOrEmpty(shardId))
{
// unsupported entityId - could only happen in highly customized extractors
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems,
Option<IActorRef>.None));
return;
}

async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath)
{
// now we just need to check to see if an entity ref exists
try
{
var entityRef = await Context.ActorSelection(entityPath).ResolveOne(getEntityLocation.Timeout);
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress,
new Option<IActorRef>(entityRef)));
}
catch (ActorNotFoundException ex)
{
// entity does not exist
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress,
Option<IActorRef>.None));
}
}

if (!_shards.TryGetValue(shardId, out var shardActorRef))
{
// shard is not homed yet, so try looking up the ShardRegion
if (!_regionByShard.TryGetValue(shardId, out var shardRegionRef))
{
// shardRegion isn't allocated either
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems,
Option<IActorRef>.None));
}
else
{
// ShardRegion exists, but shard is not homed
// NOTE: in the event that we're querying a shard's location from a ShardRegionProxy
// the shard may not be technically "homed" inside the proxy, but it does exist.
#pragma warning disable CS4014
ResolveEntityRef(GetNodeAddress(shardRegionRef), shardRegionRef.Path / shardId / shardId); // needs to run as a detached task
#pragma warning restore CS4014
}

return;
}

#pragma warning disable CS4014
ResolveEntityRef(GetNodeAddress(shardActorRef), shardActorRef.Path / shardId); // needs to run as a detached task
#pragma warning restore CS4014
}
catch (Exception ex)
{
_log.Error(ex, "Error while trying to resolve GetEntityLocation query for entityId [{0}]. " +
"Does MessageExtractor support `ShardRegion.StartEntity`? " +
"If not, that's why you might be receiving this error.",
getEntityLocation.EntityId);
// unsupported entityId - could only happen in highly customized extractors
sender.Tell(new EntityLocation(getEntityLocation.EntityId, string.Empty, Address.AllSystems,
Option<IActorRef>.None));
}
}

private void ReplyToRegionStateQuery(IActorRef sender)
{
QueryShardsAsync<Shard.CurrentShardState>(Shard.GetCurrentShardState.Instance)
Expand Down
Loading

0 comments on commit 8fcaaa4

Please sign in to comment.