Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken object type serializer in QueryExecutor #6528

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public abstract class BatchingSqlJournalSetup
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; }

/// <summary>
Expand Down Expand Up @@ -1228,7 +1229,7 @@ private async Task<WriteMessagesResult> HandleWriteMessages(WriteMessages req, T
protected virtual void WriteEvent(TCommand command, IPersistentRepresentation persistent, string tags = "")
{
var payloadType = persistent.Payload.GetType();
var serializer = _serialization.FindSerializerForType(payloadType, Setup.DefaultSerializer);
var serializer = _serialization.FindSerializerForType(payloadType);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix, we do not use the default serializer set in the persistence plugin HOCON settings for writes anymore.


// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
Akka.Serialization.Serialization.WithTransport(_serialization.System, () =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public class QueryConfiguration
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; }

/// <summary>
Expand Down Expand Up @@ -780,7 +781,7 @@ protected DbCommand GetCommand(DbConnection connection, string sql)
protected virtual void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
{

var serializer = Serialization.FindSerializerForType(e.Payload.GetType(), Configuration.DefaultSerializer);
var serializer = Serialization.FindSerializerForType(e.Payload.GetType());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix, we do not use the default serializer set in the persistence plugin HOCON settings for writes anymore.


// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var (binary,manifest) = Akka.Serialization.Serialization.WithTransport(Serialization.System,(e.Payload,serializer) ,(state) =>
Expand Down Expand Up @@ -846,7 +847,10 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
{
// Support old writes that did not set the serializer id
var type = Type.GetType(manifest, true);
#pragma warning disable CS0618
// Backward compatibility code, we still need to use the old default serializer on read to support legacy data
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
Copy link
Contributor Author

@Arkatufus Arkatufus Mar 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only code in the SQL common journal that still uses the old settings. The purpose of this code is to make sure that really old (pre-v1.3.0) data are still readable by the journal. These pre-v1.3.0 event data does not have their serializer_id column populated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another piece of code that still uses this still uses this setting is the snapshot store reader, just like this one, it is there to make sure that legacy data are still readable.

#pragma warning restore CS0618
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
deserialized = Akka.Serialization.Serialization.WithTransport(
Serialization.System, (deserializer, (byte[])payload, type),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class SnapshotStoreSettings
/// <summary>
/// The default serializer being used if no type match override is specified
/// </summary>
[Obsolete(message: "This property should never be used, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; private set; }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class QueryConfiguration
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
public readonly string DefaultSerializer;

/// <summary>
Expand Down Expand Up @@ -336,7 +337,7 @@ DELETE FROM {Configuration.FullSnapshotTableName}
protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
{
var snapshotType = snapshot.GetType();
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);
var serializer = Serialization.FindSerializerForType(snapshotType);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(snapshot));
AddParameter(command, "@Payload", DbType.Binary, binary);
Expand All @@ -350,7 +351,7 @@ protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
protected virtual void SetManifestParameters(object snapshot, DbCommand command)
{
var snapshotType = snapshot.GetType();
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);
var serializer = Serialization.FindSerializerForType(snapshotType);

string manifest = "";
if (serializer is SerializerWithStringManifest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,22 +224,5 @@ protected override async Task DeleteAsync(string persistenceId, SnapshotSelectio
await QueryExecutor.DeleteBatchAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
}
}

private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Byte-rot method, this method is not being used anymore.

{
var snapshotType = snapshot.GetType();
var serializer = Context.System.Serialization.FindSerializerForType(snapshotType, _settings.DefaultSerializer);

var binary = Akka.Serialization.Serialization.WithTransport(_actorSystem,
() => serializer.ToBinary(snapshot));


return new SnapshotEntry(
persistenceId: metadata.PersistenceId,
sequenceNr: metadata.SequenceNr,
timestamp: metadata.Timestamp,
manifest: snapshotType.TypeQualifiedName(),
payload: binary);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
</ItemGroup>

<ItemGroup>
<None Update="data\Sqlite.CustomObject.db">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="data\Sqlite.v1.3.0.db">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DefineConstants>$(DefineConstants);RELEASE</DefineConstants>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// -----------------------------------------------------------------------
// <copyright file="CustomObjectSerializerSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Serialization;
using FluentAssertions;
using Microsoft.Data.Sqlite;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sqlite.Tests
{
public class CustomObjectSerializerSpec : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime
{
private static readonly string ConnectionString;
private static readonly Config Config;
static CustomObjectSerializerSpec()
{
var filename = $"AkkaSqlite-{Guid.NewGuid()}.db";
File.Copy("./data/Sqlite.CustomObject.db", $"{filename}.db");

ConnectionString = $"DataSource={filename}.db";
Config = ConfigurationFactory.ParseString($@"
akka.actor {{
serializers {{
mySerializer = ""{typeof(MySerializer).AssemblyQualifiedName}""
}}
serialization-bindings {{
""System.Object"" = mySerializer
}}
}}

akka.persistence {{
journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite {{
connection-string = ""{ConnectionString}""
auto-initialize = on
}}
}}
snapshot-store {{
plugin = ""akka.persistence.snapshot-store.sqlite""
sqlite {{
connection-string = ""{ConnectionString}""
auto-initialize = on
}}
}}
}}").WithFallback(SqlitePersistence.DefaultConfiguration());
}

public CustomObjectSerializerSpec(ITestOutputHelper helper)
: base(Config, nameof(CustomObjectSerializerSpec), helper)
{
}

[Fact(DisplayName = "Persistence.Sql should use custom serializer for object type")]
public async Task CustomSerializerTest()
{
var probe = CreateTestProbe();

// Sanity check to see that the system should serialize object type using MySerializer
var serializer = Sys.Serialization.FindSerializerForType(typeof(Persisted));
serializer.Should().BeOfType<MySerializer>();

var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("a", probe)));
probe.ExpectMsg("recovered");
actor.Tell(new Persisted("a"), probe);
probe.ExpectMsg(new Persisted("a"));

// Read the database directly, make sure that we're using the correct object type serializer
var conn = new SqliteConnection(ConnectionString);
conn.Open();
const string sql = "SELECT ej.serializer_id FROM event_journal ej WHERE ej.persistence_id = 'a'";
await using var cmd = new SqliteCommand(sql, conn);
var record = await cmd.ExecuteReaderAsync();
await record.ReadAsync();

// In the bug this fails, the serializer id is JSON id instead of MySerializer id
record[0].Should().Be(9999);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test makes sure that the old serializer HOCON setting are not being used in journal writes anymore.

}

[Fact(DisplayName = "Persistence.Sql should be able to read legacy data")]
public void LegacyDataTest()
{
var probe = CreateTestProbe();
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("old", probe)));
probe.ExpectMsg(new Persisted("old"));
probe.ExpectMsg("recovered");
}

public Task InitializeAsync()
{
if(File.Exists("AkkaSqlite.db"))
File.Delete("AkkaSqlite.db");
return Task.CompletedTask;
}

public Task DisposeAsync()
{
return Task.CompletedTask;
}
}

internal sealed class Persisted: IEquatable<Persisted>
{
public Persisted(string payload)
{
Payload = payload;
}

public string Payload { get; }

public bool Equals(Persisted other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Payload == other.Payload;
}

public override bool Equals(object obj)
{
return ReferenceEquals(this, obj) || obj is Persisted other && Equals(other);
}

public override int GetHashCode()
{
return (Payload != null ? Payload.GetHashCode() : 0);
}
}

internal class MySerializer : Serializer
{
public MySerializer(ExtendedActorSystem system) : base(system)
{
}

public override bool IncludeManifest { get { return true; } }
public override int Identifier { get { return 9999; } }

public override byte[] ToBinary(object obj)
{
return Encoding.UTF8.GetBytes(obj.ToString());
}

public override object FromBinary(byte[] bytes, Type type)
{
return Encoding.UTF8.GetString(bytes);
}
}

internal sealed class PersistedActor : UntypedPersistentActor
{
private readonly IActorRef _probe;

public PersistedActor(string persistenceId, IActorRef probe)
{
PersistenceId = persistenceId;
_probe = probe;
}

public override string PersistenceId { get; }

protected override void OnCommand(object message)
{
var sender = Sender;
Persist(message, _ =>
{
sender.Tell(message);
});
}

protected override void OnRecover(object message)
{
switch (message)
{
case Persisted msg:
_probe.Tell(msg);
break;
case RecoveryCompleted _:
_probe.Tell("recovered");
break;
}
}
}
}
Loading