Skip to content

Commit

Permalink
Add IShardingBufferMessageAdapter.UnApply
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored and Aaronontheweb committed Jan 13, 2025
1 parent 4b85133 commit 1cdf9da
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public object Apply(object message, IActorContext context)
_counter.IncrementAndGet();
return message;
}

public object UnApply(object message, IActorContext context)
{
throw new NotImplementedException();
}
}

private const string ShardTypeName = "Caat";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ private sealed class BufferMessageAdapter: IShardingBufferMessageAdapter
{
public object Apply(object message, IActorContext context)
=> new MyEnvelope(message);

public object UnApply(object message, IActorContext context)
{
return message is MyEnvelope envelope ? envelope.Message : message;
}
}

private class EchoActor: UntypedActor
Expand Down Expand Up @@ -234,7 +239,7 @@ public async Task WrappedMessageDelivery()
var continueMessage = await ExpectShardStartup();

// this message should be buffered
_shard.Tell(new ShardingEnvelope(Msg, new MyEnvelope(Msg)));
_shard.Tell(new ShardingEnvelope(Msg, Msg));
await Task.Yield();

// Tell shard to continue processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Akka.Cluster.Sharding;
public interface IShardingBufferMessageAdapter
{
public object Apply(object message, IActorContext context);
public object UnApply(object message, IActorContext context);
}

[InternalApi]
Expand All @@ -24,6 +25,8 @@ internal class EmptyBufferMessageAdapter: IShardingBufferMessageAdapter
private EmptyBufferMessageAdapter()
{
}

public object Apply(object message, IActorContext context) => message;

public object UnApply(object message, IActorContext context) => message;
}
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2004,7 +2004,7 @@ private void SendMsgBuffer(EntityId entityId)
if (WrappedMessage.Unwrap(message) is ShardRegion.StartEntity se)
StartEntity(se.EntityId, @ref);
else
DeliverMessage(entityId, message, @ref);
DeliverMessage(entityId, _bufferMessageAdapter.UnApply(message, Context), @ref);
}

TouchLastMessageTimestamp(entityId);
Expand Down

0 comments on commit 1cdf9da

Please sign in to comment.