From e8beec4f18cf25e68d394ec35143b3e9267bb406 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 4 Sep 2024 21:35:04 +0700 Subject: [PATCH] Fix StreamRefSerializer NRE bug (#7333) * Fix StreamRefSerializer NRE bug * Update API Approval list --- ...APISpec.ApproveStreams.DotNet.verified.txt | 1 + ...oreAPISpec.ApproveStreams.Net.verified.txt | 1 + .../Serialization/StreamRefSerializer.cs | 42 ++++++++++++ .../Serialization/StreamRefSerializer.cs | 68 +++++++++---------- 4 files changed, 77 insertions(+), 35 deletions(-) create mode 100644 src/core/Akka.Streams.Tests/Serialization/StreamRefSerializer.cs diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index 3167a0d0445..931d9918ccb 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -4753,6 +4753,7 @@ namespace Akka.Streams.Implementation.Stages } namespace Akka.Streams.Serialization { + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class StreamRefSerializer : Akka.Serialization.SerializerWithStringManifest { public StreamRefSerializer(Akka.Actor.ExtendedActorSystem system) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index ce092ba63d1..3205d29804b 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -4727,6 +4727,7 @@ namespace Akka.Streams.Implementation.Stages } namespace Akka.Streams.Serialization { + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class StreamRefSerializer : Akka.Serialization.SerializerWithStringManifest { public StreamRefSerializer(Akka.Actor.ExtendedActorSystem system) { } diff --git a/src/core/Akka.Streams.Tests/Serialization/StreamRefSerializer.cs b/src/core/Akka.Streams.Tests/Serialization/StreamRefSerializer.cs new file mode 100644 index 00000000000..6beacf93f92 --- /dev/null +++ b/src/core/Akka.Streams.Tests/Serialization/StreamRefSerializer.cs @@ -0,0 +1,42 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using Akka.Serialization; +using Akka.Streams.Implementation.StreamRef; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; +using static FluentAssertions.FluentActions; + +namespace Akka.Streams.Tests.Serialization; + +public class StreamRefSerializer: Akka.TestKit.Xunit2.TestKit +{ + public StreamRefSerializer(ITestOutputHelper output) + : base(ActorMaterializer.DefaultConfig(), nameof(StreamRefSerializer), output) + { + } + + [Fact(DisplayName = "StreamRefSerializer should not throw NRE when configuration were set before ActorSystem started")] + public void StreamsConfigBugTest() + { + var message = new SequencedOnNext(10, "test"); + var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerFor(message); + var manifest = serializer.Manifest(message); + + byte[] bytes = null; + Invoking(() => + { + bytes = serializer.ToBinary(message); // This throws an NRE in the bug + }).Should().NotThrow(); + + var deserialized = (SequencedOnNext) serializer.FromBinary(bytes, manifest); + deserialized.SeqNr.Should().Be(message.SeqNr); + deserialized.Payload.Should().Be(message.Payload); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Serialization/StreamRefSerializer.cs b/src/core/Akka.Streams/Serialization/StreamRefSerializer.cs index b86c1cc53a5..f9ac94f43a5 100644 --- a/src/core/Akka.Streams/Serialization/StreamRefSerializer.cs +++ b/src/core/Akka.Streams/Serialization/StreamRefSerializer.cs @@ -10,7 +10,6 @@ using Akka.Actor; using Akka.Serialization; using Akka.Streams.Serialization.Proto.Msg; -using Akka.Util; using Google.Protobuf; using Akka.Streams.Implementation.StreamRef; using CumulativeDemand = Akka.Streams.Implementation.StreamRef.CumulativeDemand; @@ -19,12 +18,12 @@ using RemoteStreamFailure = Akka.Streams.Implementation.StreamRef.RemoteStreamFailure; using SequencedOnNext = Akka.Streams.Implementation.StreamRef.SequencedOnNext; +#nullable enable namespace Akka.Streams.Serialization { public sealed class StreamRefSerializer : SerializerWithStringManifest { private readonly ExtendedActorSystem _system; - private readonly Akka.Serialization.Serialization _serialization; private const string SequencedOnNextManifest = "A"; private const string CumulativeDemandManifest = "B"; @@ -37,52 +36,51 @@ public sealed class StreamRefSerializer : SerializerWithStringManifest public StreamRefSerializer(ExtendedActorSystem system) : base(system) { _system = system; - _serialization = system.Serialization; } public override string Manifest(object o) { - switch (o) + return o switch { - case SequencedOnNext _: return SequencedOnNextManifest; - case CumulativeDemand _: return CumulativeDemandManifest; - case OnSubscribeHandshake _: return OnSubscribeHandshakeManifest; - case RemoteStreamFailure _: return RemoteSinkFailureManifest; - case RemoteStreamCompleted _: return RemoteSinkCompletedManifest; - case SourceRefImpl _: return SourceRefManifest; - case SinkRefImpl _: return SinkRefManifest; - default: throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o)); - } + SequencedOnNext => SequencedOnNextManifest, + CumulativeDemand => CumulativeDemandManifest, + OnSubscribeHandshake => OnSubscribeHandshakeManifest, + RemoteStreamFailure => RemoteSinkFailureManifest, + RemoteStreamCompleted => RemoteSinkCompletedManifest, + SourceRefImpl => SourceRefManifest, + SinkRefImpl => SinkRefManifest, + _ => throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o)) + }; } public override byte[] ToBinary(object o) { - switch (o) + return o switch { - case SequencedOnNext onNext: return SerializeSequencedOnNext(onNext).ToByteArray(); - case CumulativeDemand demand: return SerializeCumulativeDemand(demand).ToByteArray(); - case OnSubscribeHandshake handshake: return SerializeOnSubscribeHandshake(handshake).ToByteArray(); - case RemoteStreamFailure failure: return SerializeRemoteStreamFailure(failure).ToByteArray(); - case RemoteStreamCompleted completed: return SerializeRemoteStreamCompleted(completed).ToByteArray(); - case SourceRefImpl sourceRef: return SerializeSourceRef(sourceRef).ToByteArray(); - case SinkRefImpl sinkRef: return SerializeSinkRef(sinkRef).ToByteArray(); - default: throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o)); - } + SequencedOnNext onNext => SerializeSequencedOnNext(onNext).ToByteArray(), + CumulativeDemand demand => SerializeCumulativeDemand(demand).ToByteArray(), + OnSubscribeHandshake handshake => SerializeOnSubscribeHandshake(handshake).ToByteArray(), + RemoteStreamFailure failure => SerializeRemoteStreamFailure(failure).ToByteArray(), + RemoteStreamCompleted completed => SerializeRemoteStreamCompleted(completed).ToByteArray(), + SourceRefImpl sourceRef => SerializeSourceRef(sourceRef).ToByteArray(), + SinkRefImpl sinkRef => SerializeSinkRef(sinkRef).ToByteArray(), + _ => throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o)) + }; } public override object FromBinary(byte[] bytes, string manifest) { - switch (manifest) + return manifest switch { - case SequencedOnNextManifest: return DeserializeSequenceOnNext(bytes); - case CumulativeDemandManifest: return DeserializeCumulativeDemand(bytes); - case OnSubscribeHandshakeManifest: return DeserializeOnSubscribeHandshake(bytes); - case RemoteSinkFailureManifest: return DeserializeRemoteSinkFailure(bytes); - case RemoteSinkCompletedManifest: return DeserializeRemoteSinkCompleted(bytes); - case SourceRefManifest: return DeserializeSourceRef(bytes); - case SinkRefManifest: return DeserializeSinkRef(bytes); - default: throw new ArgumentException($"Unsupported manifest '{manifest}'", nameof(manifest)); - } + SequencedOnNextManifest => DeserializeSequenceOnNext(bytes), + CumulativeDemandManifest => DeserializeCumulativeDemand(bytes), + OnSubscribeHandshakeManifest => DeserializeOnSubscribeHandshake(bytes), + RemoteSinkFailureManifest => DeserializeRemoteSinkFailure(bytes), + RemoteSinkCompletedManifest => DeserializeRemoteSinkCompleted(bytes), + SourceRefManifest => DeserializeSourceRef(bytes), + SinkRefManifest => DeserializeSinkRef(bytes), + _ => throw new ArgumentException($"Unsupported manifest '{manifest}'", nameof(manifest)) + }; } private SinkRefImpl DeserializeSinkRef(byte[] bytes) @@ -129,7 +127,7 @@ private SequencedOnNext DeserializeSequenceOnNext(byte[] bytes) { var onNext = Proto.Msg.SequencedOnNext.Parser.ParseFrom(bytes); var p = onNext.Payload; - var payload = _serialization.Deserialize( + var payload = system.Serialization.Deserialize( p.EnclosedMessage.ToByteArray(), p.SerializerId, p.MessageManifest?.ToStringUtf8()); @@ -169,7 +167,7 @@ private ByteString SerializeCumulativeDemand(CumulativeDemand demand) => private ByteString SerializeSequencedOnNext(SequencedOnNext onNext) { var payload = onNext.Payload; - var serializer = _serialization.FindSerializerFor(payload); + var serializer = system.Serialization.FindSerializerFor(payload); var manifest = Akka.Serialization.Serialization.ManifestFor(serializer, payload); var p = new Payload