diff --git a/src/documentation/articles/serialization.md b/src/documentation/articles/serialization.md index 863b5a20..e0aa1fb5 100644 --- a/src/documentation/articles/serialization.md +++ b/src/documentation/articles/serialization.md @@ -83,13 +83,16 @@ The code is based on three elements shall be available to [Entity Framework Core - **Key SerDes**: the serializer of the Primary Key - **ValueContainer SerDes**: the serializer of the ValueContainer - ### Default types [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) comes with some default values: - **ValueContainer** class: KEFCore uses `DefaultValueContainer` (i.e. `DefaultKEFCoreSerDes.DefaultValueContainer`) which stores the CLR type of Entity, the properties ordered by their index with associated CLT type, name and JSON serializaed value; the class is marked for JSON serialization and it is used from the **ValueContainer SerDes**; -- **Key SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.Key.Json` (i.e. `DefaultKEFCoreSerDes.DefaultKeySerialization`), the type automatically manages simple or complex Primary Key -- **ValueContainer SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.ValueContainer.Json<>` (i.e. `DefaultKEFCoreSerDes.DefaultValueContainerSerialization`) +- **Key SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.Key.JsonRaw` (i.e. `DefaultKEFCoreSerDes.DefaultKeySerialization`), the type automatically manages simple or complex Primary Key +- **ValueContainer SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.ValueContainer.JsonRaw<>` (i.e. `DefaultKEFCoreSerDes.DefaultValueContainerSerialization`) + +Both Key and ValueContainer SerDes come with two kind of data transfer mechanisms: +- Raw: it uses `byte` arrays for data transfer +- Buffered: they use `ByteBuffer` for data transfer ### User override @@ -138,7 +141,7 @@ public class CustomValueContainer : IValueContainer where TKey : not #### **Key SerDes** and **ValueContainer SerDes** class A custom **Key SerDes** class shall follow the following rules: -- must implements the `IKNetSerDes` interface or extend `KNetSerDes` +- must implements the `ISerDes` interface or extend `SerDes` - must be a generic type - must have a parameterless constructor - can store serialization information using Headers of Apache Kafka record (this information will be used from `EntityExtractor`) @@ -146,17 +149,17 @@ A custom **Key SerDes** class shall follow the following rules: An example snippet is the follow based on JSON serializer: ```C# -public class CustomKeySerDes : KNetSerDes +public class CustomKeySerDes : SerDesRaw { readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!); readonly byte[] customSerDesName = Encoding.UTF8.GetBytes(typeof(CustomKeySerDes<>).FullName!); - /// + /// public override byte[] Serialize(string topic, T data) { return SerializeWithHeaders(topic, null, data); } - /// + /// public override byte[] SerializeWithHeaders(string topic, Headers headers, T data) { headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName); @@ -165,12 +168,12 @@ public class CustomKeySerDes : KNetSerDes var jsonStr = System.Text.Json.JsonSerializer.Serialize(data); return Encoding.UTF8.GetBytes(jsonStr); } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { if (data == null) return default; @@ -180,7 +183,7 @@ public class CustomKeySerDes : KNetSerDes ``` ```C# -public class CustomValueContainerSerDes : KNetSerDes +public class CustomValueContainerSerDes : SerDesRaw { readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(CustomValueContainerSerDes<>).FullName!); readonly byte[] valueContainerName = null!; @@ -205,12 +208,12 @@ public class CustomValueContainerSerDes : KNetSerDes throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type"); } - /// + /// public override byte[] Serialize(string topic, T data) { return SerializeWithHeaders(topic, null, data); } - /// + /// public override byte[] SerializeWithHeaders(string topic, Headers headers, T data) { headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName); @@ -219,12 +222,12 @@ public class CustomValueContainerSerDes : KNetSerDes var jsonStr = System.Text.Json.JsonSerializer.Serialize(data); return Encoding.UTF8.GetBytes(jsonStr); } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { if (data == null) return default; @@ -249,6 +252,10 @@ The engine comes with two different encoders - Binary - Json +Both Key and ValueContainer SerDes, Binary or Json, come with two kind of data transfer mechanisms: +- Raw: it uses `byte` arrays for data transfer +- Buffered: they use `ByteBuffer` for data transfer + ### Avro schema The following schema is the default used from the engine and can be registered in Apache Schema registry so other applications can use it to extract the data stored in the topics: @@ -343,9 +350,9 @@ The extension converted this schema into code to speedup the exection of seriali ### How to use Avro `KafkaDbContext` contains three properties can be used to override the default types: -- **KeySerializationType**: set this value to `AvroKEFCoreSerDes.Key.Binary<>` or `AvroKEFCoreSerDes.Key.Json<>` or use `AvroKEFCoreSerDes.DefaultKeySerialization` (defaults to `AvroKEFCoreSerDes.Key.Binary<>`), both types automatically manages simple or complex Primary Key -- **ValueSerializationType**: set this value to `AvroKEFCoreSerDes.ValueContainer.Binary<>` or `AvroKEFCoreSerDes.ValueContainer.Json<>` or use `AvroKEFCoreSerDes.DefaultValueContainerSerialization` (defaults to `AvroKEFCoreSerDes.ValueContainer.Binary<>`) -- **ValueContainerType**: set this value to `AvroValueContainer<>` or use `AvroKEFCoreSerDes.DefaultValueContainer` +- **KeySerializationType**: set this value to `AvroKEFCoreSerDes.Key.BinaryRaw<>` or `AvroKEFCoreSerDes.Key.JsonRaw<>` or use `AvroKEFCoreSerDes.DefaultKeySerialization` (defaults to `AvroKEFCoreSerDes.Key.BinaryRaw<>`), both types automatically manages simple or complex Primary Key +- **ValueSerializationType**: set this value to `AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>` or `AvroKEFCoreSerDes.ValueContainer.JsonRaw<>` or use `AvroKEFCoreSerDes.DefaultValueContainerSerialization` (defaults to `AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>`) +- **ValueContainerType**: set this value to `AvroValueContainerRaw<>` or use `AvroKEFCoreSerDes.DefaultValueContainer` An example is: @@ -355,9 +362,9 @@ using (context = new BloggingContext() BootstrapServers = "KAFKA-SERVER:9092", ApplicationId = "MyAppid", DbName = "MyDBName", - KeySerializationType = UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.Binary<>) : typeof(AvroKEFCoreSerDes.Key.Json<>), + KeySerializationType = UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.Key.JsonRaw<>), ValueContainerType = typeof(AvroValueContainer<>), - ValueSerializationType = UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.Binary<>) : typeof(AvroKEFCoreSerDes.ValueContainer.Json<>), + ValueSerializationType = UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.ValueContainer.JsonRaw<>), }) { // execute stuff here @@ -368,6 +375,10 @@ using (context = new BloggingContext() With package [MASES.EntityFrameworkCore.KNet.Serialization.Protobuf](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet.Serialization.Protobuf/) an user can choose the Protobuf serializer. +Both Key and ValueContainer SerDes come with two kind of data transfer mechanisms: +- Raw: it uses `byte` arrays for data transfer +- Buffered: they use `ByteBuffer` for data transfer + ### Protobuf schema The following schema is the default used from the engine and can be registered in Apache Schema registry so other applications can use it to extract the data stored in the topics: @@ -499,9 +510,9 @@ The extension converted this schema into code to speedup the exection of seriali ### How to use Protobuf `KafkaDbContext` contains three properties can be used to override the default types: -- **KeySerializationType**: set this value to `ProtobufKEFCoreSerDes..Key.Binary<>` or use `ProtobufKEFCoreSerDes.DefaultKeySerialization`, the type automatically manages simple or complex Primary Key -- **ValueSerializationType**: set this value to `ProtobufKEFCoreSerDes.ValueContainer.Binary<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainerSerialization` -- **ValueContainerType**: set this value to `ProtobufValueContainer<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainer` +- **KeySerializationType**: set this value to `ProtobufKEFCoreSerDes..Key.BinaryRaw<>` or use `ProtobufKEFCoreSerDes.DefaultKeySerialization`, the type automatically manages simple or complex Primary Key +- **ValueSerializationType**: set this value to `ProtobufKEFCoreSerDes.ValueContainer.BinaryRaw<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainerSerialization` +- **ValueContainerType**: set this value to `ProtobufValueContainerRaw<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainer` An example is: diff --git a/src/net/Common/Common.props b/src/net/Common/Common.props index eff38c0d..7740c32d 100644 --- a/src/net/Common/Common.props +++ b/src/net/Common/Common.props @@ -4,7 +4,7 @@ MASES s.r.l. MASES s.r.l. MASES s.r.l. - 2.1.1.0 + 2.2.0.0 net6.0;net7.0;net8.0 latest true diff --git a/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj b/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj index 9c667100..f0549c95 100644 --- a/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj +++ b/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj @@ -23,7 +23,7 @@ - + diff --git a/src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs b/src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs index 0385e769..67393692 100644 --- a/src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs +++ b/src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs @@ -20,6 +20,7 @@ using Avro.IO; using Avro.Specific; +using Java.Nio; using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage; using MASES.KNet.Serialization; using Org.Apache.Kafka.Common.Header; @@ -28,52 +29,52 @@ namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro; /// -/// Avro base class to define extensions of , for example +/// Avro base class to define extensions of , for example /// public static class AvroKEFCoreSerDes { /// /// Returns the default serializer for keys /// - public static readonly Type DefaultKeySerialization = typeof(Key.Binary<>); + public static readonly Type DefaultKeySerialization = typeof(Key.BinaryRaw<>); /// /// Returns the default serializer for value containers /// - public static readonly Type DefaultValueContainerSerialization = typeof(ValueContainer.Binary<>); + public static readonly Type DefaultValueContainerSerialization = typeof(ValueContainer.BinaryRaw<>); /// /// Returns the default for value containers /// public static readonly Type DefaultValueContainer = typeof(AvroValueContainer<>); /// - /// Base class to define key extensions of , for example + /// Base class to define key extensions of , for example /// public static class Key { /// - /// Avro Key Binary encoder extension of , for example + /// Avro Key Binary encoder extension of , for example based on array /// /// - public class Binary : SerDes + public class BinaryRaw : SerDesRaw { readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!); - readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified()); + readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(BinaryRaw<>).ToAssemblyQualified()); readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA); readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA); - readonly ISerDes _defaultSerDes = default!; + readonly ISerDesRaw _defaultSerDes = default!; /// public override bool UseHeaders => true; /// /// Default initializer /// - public Binary() + public BinaryRaw() { if (KNetSerialization.IsInternalManaged()) { - _defaultSerDes = new SerDes(); + _defaultSerDes = new SerDesRaw(); } else if (!typeof(T).IsArray) { - throw new InvalidOperationException($"{typeof(Binary<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); + throw new InvalidOperationException($"{typeof(BinaryRaw<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); } } @@ -101,12 +102,12 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat SpecificWriter.Write(container, encoder); return memStream.ToArray(); } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null!, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); @@ -120,30 +121,99 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d } /// - /// Avro Key Json encoder extension of , for example + /// Avro Key Binary encoder extension of , for example based on /// /// - public class Json : SerDes + public class BinaryBuffered : SerDesBuffered { readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!); - readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified()); + readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(BinaryBuffered<>).ToAssemblyQualified()); readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA); readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA); - readonly ISerDes _defaultSerDes = default!; + readonly ISerDesBuffered _defaultSerDes = default!; /// public override bool UseHeaders => true; /// /// Default initializer /// - public Json() + public BinaryBuffered() { if (KNetSerialization.IsInternalManaged()) { - _defaultSerDes = new SerDes(); + _defaultSerDes = new SerDesBuffered(); } else if (!typeof(T).IsArray) { - throw new InvalidOperationException($"{typeof(Json<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); + throw new InvalidOperationException($"{typeof(BinaryBuffered<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); + } + } + + /// + public override ByteBuffer Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, T data) + { + headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName); + headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName); + + if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data); + + MemoryStream memStream = new(); + BinaryEncoder encoder = new(memStream); + var container = new AvroKeyContainer(); + if (data is object[] dataArray) + { + container.PrimaryKey = new List(dataArray); + } + else throw new InvalidDataException($"Cannot manage inputs different from object[], input is {data?.GetType()}"); + SpecificWriter.Write(container, encoder); + return ByteBuffer.From(memStream); + } + /// + public override T Deserialize(string topic, ByteBuffer data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, ByteBuffer data) + { + if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); + + BinaryDecoder decoder = new(data); + AvroKeyContainer t = new AvroKeyContainer(); + t = SpecificReader.Read(t!, decoder); + return (T)(object)(t.PrimaryKey.ToArray()); + } + } + + /// + /// Avro Key Json encoder extension of , for example based on array + /// + /// + public class JsonRaw : SerDesRaw + { + readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!); + readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(JsonRaw<>).ToAssemblyQualified()); + readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA); + readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA); + readonly ISerDesRaw _defaultSerDes = default!; + /// + public override bool UseHeaders => true; + /// + /// Default initializer + /// + public JsonRaw() + { + if (KNetSerialization.IsInternalManaged()) + { + _defaultSerDes = new SerDesRaw(); + } + else if (!typeof(T).IsArray) + { + throw new InvalidOperationException($"{typeof(JsonRaw<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); } } @@ -166,12 +236,12 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat encoder.Flush(); return memStream.ToArray(); } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null!, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); @@ -183,20 +253,84 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d return t; } } + + /// + /// Avro Key Json encoder extension of , for example based on + /// + /// + public class JsonBuffered : SerDesBuffered + { + readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!); + readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(JsonBuffered<>).ToAssemblyQualified()); + readonly SpecificDefaultWriter SpecificWriter = new(AvroKeyContainer._SCHEMA); + readonly SpecificDefaultReader SpecificReader = new(AvroKeyContainer._SCHEMA, AvroKeyContainer._SCHEMA); + readonly ISerDesBuffered _defaultSerDes = default!; + /// + public override bool UseHeaders => true; + /// + /// Default initializer + /// + public JsonBuffered() + { + if (KNetSerialization.IsInternalManaged()) + { + _defaultSerDes = new SerDesBuffered(); + } + else if (!typeof(T).IsArray) + { + throw new InvalidOperationException($"{typeof(JsonBuffered<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); + } + } + + /// + public override ByteBuffer Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, T data) + { + headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName); + headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName); + + if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data); + + MemoryStream memStream = new(); + JsonEncoder encoder = new(AvroKeyContainer._SCHEMA, memStream); + SpecificWriter.Write(data, encoder); + encoder.Flush(); + return ByteBuffer.From(memStream); + } + /// + public override T Deserialize(string topic, ByteBuffer data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, ByteBuffer data) + { + if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); + + JsonDecoder decoder = new(AvroKeyContainer._SCHEMA, data); + T t = (T)Activator.CreateInstance(typeof(T))!; + t = SpecificReader.Read(t!, decoder); + return t; + } + } } /// - /// Base class to define ValueContainer extensions of , for example + /// Base class to define ValueContainer extensions of , for example /// public static class ValueContainer { /// - /// Avro ValueContainer Binary encoder extension of , for example + /// Avro ValueContainer Binary encoder extension of , for example based on array /// /// - public class Binary : SerDes + public class BinaryRaw : SerDesRaw { - readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified()); + readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(BinaryRaw<>).ToAssemblyQualified()); readonly byte[] valueContainerName = null!; readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA); readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA); @@ -205,7 +339,7 @@ public class Binary : SerDes /// /// Default initializer /// - public Binary() + public BinaryRaw() { var tt = typeof(T); if (tt.IsGenericType) @@ -239,12 +373,12 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat SpecificWriter.Write(data, encoder); return memStream.ToArray(); } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null!, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { using MemoryStream memStream = new(data); @@ -256,12 +390,76 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d } /// - /// Avro ValueContainer Json encoder extension of , for example + /// Avro ValueContainer Binary encoder extension of , for example based on + /// + /// + public class BinaryBuffered : SerDesBuffered + { + readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(BinaryBuffered<>).ToAssemblyQualified()); + readonly byte[] valueContainerName = null!; + readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA); + readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA); + /// + public override bool UseHeaders => true; + /// + /// Default initializer + /// + public BinaryBuffered() + { + var tt = typeof(T); + if (tt.IsGenericType) + { + var keyT = tt.GetGenericArguments(); + if (keyT.Length != 1) { throw new ArgumentException($"{typeof(T).Name} does not contains a single generic argument and cannot be used because it is not a valid ValueContainer type"); } + var t = tt.GetGenericTypeDefinition(); + if (t.GetInterface(typeof(IValueContainer<>).Name) != null) + { + valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified()); + return; + } + else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type"); + } + throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type"); + } + + /// + public override ByteBuffer Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, T data) + { + headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName); + headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName); + + MemoryStream memStream = new(); + BinaryEncoder encoder = new(memStream); + SpecificWriter.Write(data, encoder); + return ByteBuffer.From(memStream); + } + /// + public override T Deserialize(string topic, ByteBuffer data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, ByteBuffer data) + { + BinaryDecoder decoder = new(data); + T t = (T)Activator.CreateInstance(typeof(T))!; + t = SpecificReader.Read(t!, decoder); + return t; + } + } + + /// + /// Avro ValueContainer Json encoder extension of , for example based on array /// /// - public class Json : SerDes + public class JsonRaw : SerDesRaw { - readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified()); + readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(JsonRaw<>).ToAssemblyQualified()); readonly byte[] valueContainerName = null!; readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA); readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA); @@ -270,7 +468,7 @@ public class Json : SerDes /// /// Default initializer /// - public Json() + public JsonRaw() { var tt = typeof(T); if (tt.IsGenericType) @@ -305,12 +503,12 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat encoder.Flush(); return memStream.ToArray(); } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null!, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { using MemoryStream memStream = new(data); @@ -320,5 +518,70 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d return t; } } + + /// + /// Avro ValueContainer Json encoder extension of , for example based on + /// + /// + public class JsonBuffered : SerDesBuffered + { + readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(JsonBuffered<>).ToAssemblyQualified()); + readonly byte[] valueContainerName = null!; + readonly SpecificDefaultWriter SpecificWriter = new(AvroValueContainer._SCHEMA); + readonly SpecificDefaultReader SpecificReader = new(AvroValueContainer._SCHEMA, AvroValueContainer._SCHEMA); + /// + public override bool UseHeaders => true; + /// + /// Default initializer + /// + public JsonBuffered() + { + var tt = typeof(T); + if (tt.IsGenericType) + { + var keyT = tt.GetGenericArguments(); + if (keyT.Length != 1) { throw new ArgumentException($"{typeof(T).Name} does not contains a single generic argument and cannot be used because it is not a valid ValueContainer type"); } + var t = tt.GetGenericTypeDefinition(); + if (t.GetInterface(typeof(IValueContainer<>).Name) != null) + { + valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified()); + return; + } + else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type"); + } + throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type"); + } + + /// + public override ByteBuffer Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, T data) + { + headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName); + headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName); + + MemoryStream memStream = new(); + JsonEncoder encoder = new(AvroValueContainer._SCHEMA, memStream); + SpecificWriter.Write(data, encoder); + encoder.Flush(); + return ByteBuffer.From(memStream); + } + /// + public override T Deserialize(string topic, ByteBuffer data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, ByteBuffer data) + { + JsonDecoder decoder = new(AvroValueContainer._SCHEMA, data); + T t = (T)Activator.CreateInstance(typeof(T))!; + t = SpecificReader.Read(t!, decoder); + return t; + } + } } } \ No newline at end of file diff --git a/src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs b/src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs index ddea8186..414c41fa 100644 --- a/src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs +++ b/src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs @@ -19,6 +19,7 @@ #nullable enable using Google.Protobuf; +using Java.Nio; using MASES.EntityFrameworkCore.KNet.Serialization.Protobuf.Storage; using MASES.KNet.Serialization; using Org.Apache.Kafka.Common.Header; @@ -27,50 +28,50 @@ namespace MASES.EntityFrameworkCore.KNet.Serialization.Protobuf; /// -/// Protobuf base class to define extensions of , for example +/// Protobuf base class to define extensions of , for example /// public static class ProtobufKEFCoreSerDes { /// /// Returns the default serializer for keys /// - public static readonly Type DefaultKeySerialization = typeof(Key.Binary<>); + public static readonly Type DefaultKeySerialization = typeof(Key.BinaryRaw<>); /// /// Returns the default serializer for value containers /// - public static readonly Type DefaultValueContainerSerialization = typeof(ValueContainer.Binary<>); + public static readonly Type DefaultValueContainerSerialization = typeof(ValueContainer.BinaryRaw<>); /// /// Returns the default for value containers /// public static readonly Type DefaultValueContainer = typeof(ProtobufValueContainer<>); /// - /// Base class to define key extensions of , for example + /// Base class to define key extensions of , for example based on array /// public static class Key { /// - /// Protobuf Key Binary encoder extension of , for example + /// Protobuf Key Binary encoder extension of , for example /// /// - public class Binary : SerDes + public class BinaryRaw : SerDesRaw { readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!); - readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified()); - readonly ISerDes _defaultSerDes = default!; + readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(BinaryRaw<>).ToAssemblyQualified()); + readonly ISerDesRaw _defaultSerDes = default!; /// public override bool UseHeaders => true; /// /// Default initializer /// - public Binary() + public BinaryRaw() { if (KNetSerialization.IsInternalManaged()) { - _defaultSerDes = new SerDes(); + _defaultSerDes = new SerDesRaw(); } else if (!typeof(T).IsArray) { - throw new InvalidOperationException($"{typeof(Binary<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); + throw new InvalidOperationException($"{typeof(BinaryRaw<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); } } @@ -96,12 +97,12 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat keyContainer.WriteTo(stream); return stream.ToArray(); } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null!, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); @@ -113,27 +114,93 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d return (T)container.GetContent(); } } + + /// + /// Protobuf Key Binary encoder extension of , for example based on + /// + /// + public class BinaryBuffered : SerDesBuffered + { + readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!); + readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(BinaryBuffered<>).ToAssemblyQualified()); + readonly ISerDesBuffered _defaultSerDes = default!; + /// + public override bool UseHeaders => true; + /// + /// Default initializer + /// + public BinaryBuffered() + { + if (KNetSerialization.IsInternalManaged()) + { + _defaultSerDes = new SerDesBuffered(); + } + else if (!typeof(T).IsArray) + { + throw new InvalidOperationException($"{typeof(BinaryBuffered<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); + } + } + + /// + public override ByteBuffer Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, T data) + { + headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName); + headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName); + + if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data); + KeyContainer keyContainer = null!; + if (data is object[] dataArray) + { + keyContainer = new KeyContainer(dataArray); + } + + using MemoryStream stream = new(); + keyContainer.WriteTo(stream); + return stream.ToArray(); + } + /// + public override T Deserialize(string topic, ByteBuffer data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, ByteBuffer data) + { + if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); + + if (data == null) return default!; + + KeyContainer container = KeyContainer.Parser.ParseFrom(data.ToStream()); + + return (T)container.GetContent(); + } + } } /// - /// Base class to define ValueContainer extensions of , for example + /// Base class to define ValueContainer extensions of , for example /// public static class ValueContainer { /// - /// Protobuf ValueContainer Binary encoder extension of , for example + /// Protobuf ValueContainer Binary encoder extension of , for example based on array /// /// - public class Binary : SerDes where T : class, IMessage + public class BinaryRaw : SerDesRaw where T : class, IMessage { - readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Binary<>).ToAssemblyQualified()); + readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(BinaryRaw<>).ToAssemblyQualified()); readonly byte[] valueContainerName = null!; /// public override bool UseHeaders => true; /// /// Default initializer /// - public Binary() + public BinaryRaw() { var tt = typeof(T); if (tt.IsGenericType) @@ -168,12 +235,12 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat return stream.ToArray(); } } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null!, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { if (data == null) return default!; @@ -181,5 +248,65 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d return (Activator.CreateInstance(typeof(T), container) as T)!; } } + + /// + /// Protobuf ValueContainer Binary encoder extension of , for example based on + /// + /// + public class BinaryBuffered : SerDesBuffered where T : class, IMessage + { + readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(BinaryBuffered<>).ToAssemblyQualified()); + readonly byte[] valueContainerName = null!; + /// + public override bool UseHeaders => true; + /// + /// Default initializer + /// + public BinaryBuffered() + { + var tt = typeof(T); + if (tt.IsGenericType) + { + var keyT = tt.GetGenericArguments(); + if (keyT.Length != 1) { throw new ArgumentException($"{typeof(T).Name} does not contains a single generic argument and cannot be used because it is not a valid ValueContainer type"); } + var t = tt.GetGenericTypeDefinition(); + if (t.GetInterface(typeof(IValueContainer<>).Name) != null) + { + valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified()); + return; + } + else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type"); + } + throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type"); + } + + /// + public override ByteBuffer Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, T data) + { + headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName); + headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName); + + MemoryStream stream = new(); + data.WriteTo(stream); + return ByteBuffer.From(stream); + } + /// + public override T Deserialize(string topic, ByteBuffer data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, ByteBuffer data) + { + if (data == null) return default!; + var container = Storage.ValueContainer.Parser.ParseFrom(data.ToStream()); + return (Activator.CreateInstance(typeof(T), container) as T)!; + } + } } } \ No newline at end of file diff --git a/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs b/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs index da42624e..c70e2746 100644 --- a/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs +++ b/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs @@ -18,6 +18,7 @@ #nullable enable +using Java.Nio; using MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage; using MASES.KNet.Serialization; using Org.Apache.Kafka.Common.Header; @@ -26,51 +27,51 @@ namespace MASES.EntityFrameworkCore.KNet.Serialization.Json; /// -/// Default base class to define extensions of , for example +/// Default base class to define extensions of , for example /// public static class DefaultKEFCoreSerDes { /// /// Returns the default serializer for keys /// - public static readonly Type DefaultKeySerialization = typeof(Key.Json<>); + public static readonly Type DefaultKeySerialization = typeof(Key.JsonRaw<>); /// /// Returns the default serializer for value containers /// - public static readonly Type DefaultValueContainerSerialization = typeof(ValueContainer.Json<>); + public static readonly Type DefaultValueContainerSerialization = typeof(ValueContainer.JsonRaw<>); /// /// Returns the default for value containers /// public static readonly Type DefaultValueContainer = typeof(DefaultValueContainer<>); /// - /// Base class to define key extensions of , for example + /// Base class to define key extensions of , for example /// public static class Key { /// - /// Json extension of , for example + /// Json extension of , for example based on array /// /// The type to be serialized or deserialized. It can be a Primary Key or a ValueContainer like - public class Json : SerDes + public class JsonRaw : SerDesRaw { - readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified()); + readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(JsonRaw<>).ToAssemblyQualified()); readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!); - readonly ISerDes _defaultSerDes = default!; + readonly ISerDesRaw _defaultSerDes = default!; readonly JsonSerializerOptions? _options = null; /// public override bool UseHeaders => true; /// /// Default initializer /// - public Json() + public JsonRaw() { if (KNetSerialization.IsInternalManaged()) { - _defaultSerDes = new SerDes(); + _defaultSerDes = new SerDesRaw(); } else if (!typeof(T).IsArray) { - throw new InvalidOperationException($"{typeof(Json<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); + throw new InvalidOperationException($"{typeof(JsonRaw<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); } else { @@ -96,12 +97,12 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat var jsonStr = System.Text.Json.JsonSerializer.Serialize(data); return Encoding.UTF8.GetBytes(jsonStr); } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null!, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); @@ -110,20 +111,86 @@ public override T DeserializeWithHeaders(string topic, Headers headers, byte[] d return System.Text.Json.JsonSerializer.Deserialize(data, _options)!; } } + + /// + /// Json extension of , for example based on + /// + /// The type to be serialized or deserialized. It can be a Primary Key or a ValueContainer like + public class JsonBuffered : SerDesBuffered + { + readonly byte[] keySerDesName = Encoding.UTF8.GetBytes(typeof(JsonBuffered<>).ToAssemblyQualified()); + readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!); + readonly ISerDesBuffered _defaultSerDes = default!; + readonly JsonSerializerOptions? _options = null; + /// + public override bool UseHeaders => true; + /// + /// Default initializer + /// + public JsonBuffered() + { + if (KNetSerialization.IsInternalManaged()) + { + _defaultSerDes = new SerDesBuffered(); + } + else if (!typeof(T).IsArray) + { + throw new InvalidOperationException($"{typeof(JsonBuffered<>).ToAssemblyQualified()} cannot manage {typeof(T).Name}, override or build a new serializaer"); + } + else + { + _options = new JsonSerializerOptions() + { + WriteIndented = false, + }; + } + } + + /// + public override ByteBuffer Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, T data) + { + headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName); + headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName); + + if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data); + + var ms = new MemoryStream(); + System.Text.Json.JsonSerializer.Serialize(ms, data, _options); + return ByteBuffer.From(ms); + } + /// + public override T Deserialize(string topic, ByteBuffer data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, ByteBuffer data) + { + if (_defaultSerDes != null) return _defaultSerDes.DeserializeWithHeaders(topic, headers, data); + + if (data == null) return default!; + return System.Text.Json.JsonSerializer.Deserialize(data, _options)!; + } + } } /// - /// Base class to define ValueContainer extensions of , for example + /// Base class to define ValueContainer extensions of , for example /// public static class ValueContainer { /// - /// Json extension of , for example + /// Json extension of , for example based on array /// /// The type to be serialized or deserialized. It can be a Primary Key or a ValueContainer like - public class Json : SerDes + public class JsonRaw : SerDesRaw { - readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(Json<>).ToAssemblyQualified()); + readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(JsonRaw<>).ToAssemblyQualified()); readonly byte[] valueContainerName = null!; readonly System.Text.Json.JsonSerializerOptions _options; /// @@ -131,7 +198,7 @@ public class Json : SerDes /// /// Default initializer /// - public Json() + public JsonRaw() { var tt = typeof(T); if (tt.IsGenericType) @@ -167,17 +234,81 @@ public override byte[] SerializeWithHeaders(string topic, Headers headers, T dat var jsonStr = System.Text.Json.JsonSerializer.Serialize(data, _options); return Encoding.UTF8.GetBytes(jsonStr); } - /// + /// public override T Deserialize(string topic, byte[] data) { return DeserializeWithHeaders(topic, null!, data); } - /// + /// public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data) { if (data == null) return default!; return System.Text.Json.JsonSerializer.Deserialize(data, _options)!; } } + + /// + /// Json extension of , for example based on + /// + /// The type to be serialized or deserialized. It can be a Primary Key or a ValueContainer like + public class JsonBuffered : SerDesBuffered + { + readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(JsonBuffered<>).ToAssemblyQualified()); + readonly byte[] valueContainerName = null!; + readonly System.Text.Json.JsonSerializerOptions _options; + /// + public override bool UseHeaders => true; + /// + /// Default initializer + /// + public JsonBuffered() + { + var tt = typeof(T); + if (tt.IsGenericType) + { + var keyT = tt.GetGenericArguments(); + if (keyT.Length != 1) { throw new ArgumentException($"{typeof(T).Name} does not contains a single generic argument and cannot be used because it is not a valid ValueContainer type"); } + var t = tt.GetGenericTypeDefinition(); + if (t.GetInterface(typeof(IValueContainer<>).Name) != null) + { + valueContainerName = Encoding.UTF8.GetBytes(t.ToAssemblyQualified()); + _options = new System.Text.Json.JsonSerializerOptions(System.Text.Json.JsonSerializerDefaults.General) + { + WriteIndented = false, + }; + return; + } + else throw new ArgumentException($"{typeof(T).Name} does not implement IValueContainer<> and cannot be used because it is not a valid ValueContainer type"); + } + throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type"); + } + + /// + public override ByteBuffer Serialize(string topic, T data) + { + return SerializeWithHeaders(topic, null!, data); + } + /// + public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, T data) + { + headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName); + headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName); + + var ms = new MemoryStream(); + System.Text.Json.JsonSerializer.Serialize(ms, data, _options); + return ByteBuffer.From(ms); + } + /// + public override T Deserialize(string topic, ByteBuffer data) + { + return DeserializeWithHeaders(topic, null!, data); + } + /// + public override T DeserializeWithHeaders(string topic, Headers headers, ByteBuffer data) + { + if (data == null) return default!; + return System.Text.Json.JsonSerializer.Deserialize(data, _options)!; + } + } } } \ No newline at end of file diff --git a/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj b/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj index 8b870867..2d3666a4 100644 --- a/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj +++ b/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj @@ -41,10 +41,10 @@ - - - - + + + + All None diff --git a/src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs b/src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs index d25dd36e..69a48d96 100644 --- a/src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs +++ b/src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs @@ -197,13 +197,33 @@ public static object FromRecord(Org.Apache.Kafka.Clients.Consumer.ConsumerRecord public static object FromRawValueData(Type keyType, Type valueContainer, Type keySerializer, Type valueContainerSerializer, string topic, byte[] recordValue, byte[] recordKey, bool throwUnmatch = false) { var fullKeySerializer = keySerializer.MakeGenericType(keyType); + Type jvmKeyType = null!; + foreach (var interfaceType in fullKeySerializer.GetInterfaces()) + { + if (interfaceType.IsGenericType && interfaceType.Name.StartsWith(typeof(ISerDes<,>).Name)) + { + jvmKeyType = interfaceType.GetGenericArguments()[1]; + } + } + if (jvmKeyType == null) throw new InvalidOperationException($"Cannot identity JVM type from {keySerializer}"); + var fullValueContainer = valueContainer.MakeGenericType(keyType); var fullValueContainerSerializer = valueContainerSerializer.MakeGenericType(fullValueContainer); + Type jvmKValueContainerType = null!; + foreach (var interfaceType in fullValueContainerSerializer.GetInterfaces()) + { + if (interfaceType.IsGenericType && interfaceType.Name.StartsWith(typeof(ISerDes<,>).Name)) + { + jvmKValueContainerType = interfaceType.GetGenericArguments()[1]; + } + } - var ccType = typeof(LocalEntityExtractor<,,,>); - var extractorType = ccType.MakeGenericType(keyType, fullValueContainer, fullKeySerializer, fullValueContainerSerializer); - var extractor = Activator.CreateInstance(extractorType) as ILocalEntityExtractor; + if (jvmKValueContainerType == null) throw new InvalidOperationException($"Cannot identity JVM type from {fullValueContainerSerializer}"); - return extractor!.GetEntity(topic, recordValue, recordKey, throwUnmatch); + var ccType = typeof(LocalEntityExtractor<,,,,,>); + var extractorType = ccType.MakeGenericType(keyType, fullValueContainer, jvmKeyType, jvmKValueContainerType, fullKeySerializer, fullValueContainerSerializer); + var methodInfo = extractorType.GetMethod("GetEntity"); + var extractor = Activator.CreateInstance(extractorType); + return methodInfo?.Invoke(extractor, new object[] { topic, recordValue, recordKey, throwUnmatch })!; } } diff --git a/src/net/KEFCore.SerDes/LocalEntityExtractor.cs b/src/net/KEFCore.SerDes/LocalEntityExtractor.cs index de8e1dc3..1da78e02 100644 --- a/src/net/KEFCore.SerDes/LocalEntityExtractor.cs +++ b/src/net/KEFCore.SerDes/LocalEntityExtractor.cs @@ -22,27 +22,27 @@ namespace MASES.EntityFrameworkCore.KNet.Serialization; -interface ILocalEntityExtractor +interface ILocalEntityExtractor { - object GetEntity(string topic, byte[] recordValue, byte[] recordKey, bool throwUnmatch); + object GetEntity(string topic, TJVMKey recordKey, TJVMValueContainer recordValue, bool throwUnmatch); } -class LocalEntityExtractor : ILocalEntityExtractor +class LocalEntityExtractor : ILocalEntityExtractor where TKey : notnull where TValueContainer : class, IValueContainer where TKeySerializer : class, new() where TValueSerializer : class, new() { - private readonly ISerDes? _keySerdes; - private readonly ISerDes? _valueSerdes; + private readonly ISerDes? _keySerdes; + private readonly ISerDes? _valueSerdes; public LocalEntityExtractor() { - _keySerdes = new TKeySerializer() as ISerDes; - _valueSerdes = new TValueSerializer() as ISerDes; + _keySerdes = new TKeySerializer() as ISerDes; + _valueSerdes = new TValueSerializer() as ISerDes; } - public object GetEntity(string topic, byte[] recordValue, byte[] recordKey, bool throwUnmatch) + public object GetEntity(string topic, TJVMKey recordKey, TJVMValueContainer recordValue, bool throwUnmatch) { if (recordValue == null) throw new ArgumentNullException(nameof(recordValue), "Record value shall be available"); diff --git a/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs b/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs index 191cc658..0a4a4953 100644 --- a/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs +++ b/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs @@ -146,4 +146,18 @@ public static Type ValueContainerType(this IKafkaSingletonOptions options, IEnti var primaryKey = entityType.FindPrimaryKey()!.GetKeyType(); return options.ValueContainerType?.MakeGenericType(primaryKey)!; } + /// + /// Create the ValueContainer + /// + public static Type JVMKeyType(this IKafkaSingletonOptions options, IEntityType entityType) + { + return typeof(byte[]); + } + /// + /// Create the ValueContainer + /// + public static Type JVMValueContainerType(this IKafkaSingletonOptions options, IEntityType entityType) + { + return typeof(byte[]); + } } diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs index 674dee46..41b9b3c5 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs @@ -21,13 +21,12 @@ using Java.Lang; using Java.Util; using MASES.EntityFrameworkCore.KNet.Serialization.Json; -using MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage; using MASES.EntityFrameworkCore.KNet.Storage; using MASES.KNet.Common; using MASES.KNet.Consumer; using MASES.KNet.Producer; using MASES.KNet.Streams; -using Org.Apache.Kafka.Streams; +using Org.Apache.Kafka.Streams.State; using System.Globalization; namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; @@ -351,9 +350,11 @@ public virtual StreamsConfigBuilder StreamsOptions() builder.ApplicationId = ApplicationId; builder.BootstrapServers = BootstrapServers; - builder.DefaultKeySerdeClass = Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader); - builder.DefaultValueSerdeClass = Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader); - builder.DefaultDSLStore = UsePersistentStorage ? Org.Apache.Kafka.Streams.StreamsConfig.ROCKS_DB : Org.Apache.Kafka.Streams.StreamsConfig.IN_MEMORY; + string baSerdesName = Class.ClassNameOf(); + builder.DefaultKeySerdeClass = Class.ForName(baSerdesName, true, SystemClassLoader); + builder.DefaultValueSerdeClass = Class.ForName(baSerdesName, true, SystemClassLoader); + builder.DSLStoreSuppliersClass = UsePersistentStorage ? Class.ForName(Class.ClassNameOf(), true, SystemClassLoader) + : Class.ForName(Class.ClassNameOf(), true, SystemClassLoader); //if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.APPLICATION_ID_CONFIG)) //{ diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs index b345ed86..7d3c9817 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs @@ -156,14 +156,14 @@ public KafkaDbContext(DbContextOptions options) : base(options) /// /// The optional to use for key serialization /// - /// Default value is , any custom shall implement + /// Default value is , any custom shall implement /// /// public virtual Type? KeySerializationType { get; set; } = null; /// /// The optional to use for value serialization /// - /// Default value is , any custom shall implement + /// Default value is , any custom shall implement /// /// public virtual Type? ValueSerializationType { get; set; } = null; diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs index ce287cf7..5d76c6a8 100644 --- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs @@ -35,7 +35,7 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// -public class EntityTypeProducer : IEntityTypeProducer +public class EntityTypeProducer : IEntityTypeProducer where TKey : notnull where TValueContainer : class, IValueContainer where TKeySerializer : class, new() @@ -45,18 +45,18 @@ public class EntityTypeProducer? _kafkaCompactedReplicator; - private readonly MASES.KNet.Producer.IProducer? _kafkaProducer; + private readonly IKNetCompactedReplicator? _kafkaCompactedReplicator; + private readonly MASES.KNet.Producer.IProducer? _kafkaProducer; private readonly IKafkaStreamsRetriever? _streamData; - private readonly ISerDes? _keySerdes; - private readonly ISerDes? _valueSerdes; + private readonly ISerDes? _keySerdes; + private readonly ISerDes? _valueSerdes; private readonly Action? _onChangeEvent; #region KNetCompactedReplicatorEnumerable - class KNetCompactedReplicatorEnumerable(IEntityType entityType, IKNetCompactedReplicator? kafkaCompactedReplicator) : IEnumerable + class KNetCompactedReplicatorEnumerable(IEntityType entityType, IKNetCompactedReplicator? kafkaCompactedReplicator) : IEnumerable { readonly IEntityType _entityType = entityType; - readonly IKNetCompactedReplicator? _kafkaCompactedReplicator = kafkaCompactedReplicator; + readonly IKNetCompactedReplicator? _kafkaCompactedReplicator = kafkaCompactedReplicator; #region KNetCompactedReplicatorEnumerator class KNetCompactedReplicatorEnumerator : IEnumerator @@ -67,9 +67,9 @@ class KNetCompactedReplicatorEnumerator : IEnumerator Stopwatch _valueBufferSw = new Stopwatch(); #endif readonly IEntityType _entityType; - readonly IKNetCompactedReplicator? _kafkaCompactedReplicator; + readonly IKNetCompactedReplicator? _kafkaCompactedReplicator; readonly IEnumerator>? _enumerator; - public KNetCompactedReplicatorEnumerator(IEntityType entityType, IKNetCompactedReplicator? kafkaCompactedReplicator) + public KNetCompactedReplicatorEnumerator(IEntityType entityType, IKNetCompactedReplicator? kafkaCompactedReplicator) { _entityType = entityType; _kafkaCompactedReplicator = kafkaCompactedReplicator; @@ -191,15 +191,15 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) var tTValueContainer = typeof(TValueContainer); TValueContainerConstructor = tTValueContainer.GetConstructors().Single(ci => ci.GetParameters().Length == 2); - _keySerdes = new TKeySerializer() as ISerDes; - _valueSerdes = new TValueSerializer() as ISerDes; + _keySerdes = new TKeySerializer() as ISerDes; + _valueSerdes = new TValueSerializer() as ISerDes; - if (_keySerdes == null) throw new InvalidOperationException($"{typeof(TKeySerializer)} is not a {typeof(ISerDes)}"); - if (_valueSerdes == null) throw new InvalidOperationException($"{typeof(TValueSerializer)} is not a {typeof(ISerDes)}"); + if (_keySerdes == null) throw new InvalidOperationException($"{typeof(TKeySerializer)} is not a {typeof(ISerDes)}"); + if (_valueSerdes == null) throw new InvalidOperationException($"{typeof(TValueSerializer)} is not a {typeof(ISerDes)}"); if (_useCompactedReplicator) { - _kafkaCompactedReplicator = new KNetCompactedReplicator() + _kafkaCompactedReplicator = new KNetCompactedReplicator() { UpdateMode = UpdateModeTypes.OnConsume, BootstrapServers = _cluster.Options.BootstrapServers, @@ -230,9 +230,9 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) } else { - _kafkaProducer = new KNetProducer(_cluster.Options.ProducerOptionsBuilder(), _keySerdes, _valueSerdes); - _streamData = _cluster.Options.UseKNetStreams ? new KNetStreamsRetriever(cluster, entityType) - : new KafkaStreamsTableRetriever(cluster, entityType, _keySerdes!, _valueSerdes!); + _kafkaProducer = new KNetProducer(_cluster.Options.ProducerOptionsBuilder(), _keySerdes, _valueSerdes); + _streamData = _cluster.Options.UseKNetStreams ? new KNetStreamsRetriever(cluster, entityType) + : new KafkaStreamsTableRetriever(cluster, entityType, _keySerdes!, _valueSerdes!); } } @@ -256,7 +256,8 @@ public IEnumerable> Commit(IEnumerable reco List> futures = new(); foreach (KafkaRowBag record in records.Cast>()) { - var future = _kafkaProducer?.Send(new MASES.KNet.Producer.ProducerRecord(record.AssociatedTopicName, 0, record.Key, record.Value(TValueContainerConstructor)!)); + var newRecord = _kafkaProducer?.NewRecord(record.AssociatedTopicName, 0, record.Key, record.Value(TValueContainerConstructor)!); + var future = _kafkaProducer?.Send(newRecord); futures.Add(future!); } @@ -297,7 +298,7 @@ public IEnumerable ValueBuffers } } - private void KafkaCompactedReplicator_OnRemoteAdd(IKNetCompactedReplicator arg1, KeyValuePair arg2) + private void KafkaCompactedReplicator_OnRemoteAdd(IKNetCompactedReplicator arg1, KeyValuePair arg2) { Task.Factory.StartNew(() => { @@ -305,7 +306,7 @@ private void KafkaCompactedReplicator_OnRemoteAdd(IKNetCompactedReplicator arg1, KeyValuePair arg2) + private void KafkaCompactedReplicator_OnRemoteUpdate(IKNetCompactedReplicator arg1, KeyValuePair arg2) { Task.Factory.StartNew(() => { @@ -313,7 +314,7 @@ private void KafkaCompactedReplicator_OnRemoteUpdate(IKNetCompactedReplicator arg1, KeyValuePair arg2) + private void KafkaCompactedReplicator_OnRemoteRemove(IKNetCompactedReplicator arg1, KeyValuePair arg2) { Task.Factory.StartNew(() => { diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs index 5cf9ae91..d4e66da7 100644 --- a/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs @@ -34,13 +34,13 @@ public class EntityTypeProducers /// /// Allocates a new /// - public static IEntityTypeProducer Create(IEntityType entityType, IKafkaCluster cluster) + public static IEntityTypeProducer Create(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull where TValueContainer : class, IValueContainer where TKeySerializer : class, new() where TValueSerializer : class, new() { - return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType, cluster)); + return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType, cluster)); } /// /// Dispose a previously allocated @@ -54,10 +54,10 @@ public static void Dispose(IEntityTypeProducer producer) producer.Dispose(); } - static IEntityTypeProducer CreateProducerLocal(IEntityType entityType, IKafkaCluster cluster) + static IEntityTypeProducer CreateProducerLocal(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull where TValueContainer : class, IValueContainer where TKeySerializer : class, new() where TValueSerializer : class, new() - => new EntityTypeProducer(entityType, cluster); + => new EntityTypeProducer(entityType, cluster); } diff --git a/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs b/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs index c9608a47..1d4a1c5b 100644 --- a/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs @@ -34,15 +34,15 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// -public class KNetStreamsRetriever : IKafkaStreamsRetriever +public class KNetStreamsRetriever : IKafkaStreamsRetriever where TKey : notnull where TValue : IValueContainer { - struct StreamsAssociatedData(Org.Apache.Kafka.Streams.State.KeyValueBytesStoreSupplier storeSupplier, Materialized materialized, GlobalKTable globalTable) + struct StreamsAssociatedData(Org.Apache.Kafka.Streams.State.KeyValueBytesStoreSupplier storeSupplier, Materialized materialized, GlobalKTable globalTable) { public Org.Apache.Kafka.Streams.State.KeyValueBytesStoreSupplier StoreSupplier = storeSupplier; - public Materialized Materialized = materialized; - public GlobalKTable GlobalTable = globalTable; + public Materialized Materialized = materialized; + public GlobalKTable GlobalTable = globalTable; } private static bool _preserveStreamsAcrossContext = KEFCore.PreserveInformationAcrossContexts; @@ -113,7 +113,7 @@ public KNetStreamsRetriever(IKafkaCluster kafkaCluster, IEntityType entityType) { var storeSupplier = _usePersistentStorage ? Org.Apache.Kafka.Streams.State.Stores.PersistentKeyValueStore(_storageId) : Org.Apache.Kafka.Streams.State.Stores.InMemoryKeyValueStore(_storageId); - var materialized = Materialized.As(storeSupplier); + var materialized = Materialized.As(storeSupplier); var globalTable = _builder.GlobalTable(_topicName, materialized); _managedEntities.Add(_entityType, _entityType); _storagesForEntities.Add(_entityType, new StreamsAssociatedData(storeSupplier, materialized, globalTable)); @@ -271,13 +271,13 @@ class KafkaEnumberable : IEnumerable private readonly bool _useEnumeratorWithPrefetch; private readonly IKafkaCluster _kafkaCluster; private readonly IEntityType _entityType; - private readonly ReadOnlyKeyValueStore? _keyValueStore = null; + private readonly ReadOnlyKeyValueStore? _keyValueStore = null; public KafkaEnumberable(IKafkaCluster kafkaCluster, IEntityType entityType, string storageId, bool useEnumerator) { _kafkaCluster = kafkaCluster; _entityType = entityType; - _keyValueStore = _streams?.Store(storageId, QueryableStoreTypes.KeyValueStore()); + _keyValueStore = _streams?.Store(storageId, QueryableStoreTypes.KeyValueStore()); _useEnumeratorWithPrefetch = useEnumerator; #if DEBUG_PERFORMANCE Infrastructure.KafkaDbContext.ReportString($"KafkaEnumerator for {_entityType.Name} - ApproximateNumEntries {_keyValueStore?.ApproximateNumEntries}"); @@ -319,10 +319,10 @@ class KafkaEnumerator : IEnumerator private readonly bool _useEnumeratorWithPrefetch; private readonly IKafkaCluster _kafkaCluster; private readonly IEntityType _entityType; - private readonly KeyValueIterator? _keyValueIterator = null; - private readonly IEnumerator>? _enumerator = null; + private readonly KeyValueIterator? _keyValueIterator = null; + private readonly IEnumerator>? _enumerator = null; #if NET8_0_OR_GREATER - private readonly IAsyncEnumerator>? _asyncEnumerator = null; + private readonly IAsyncEnumerator>? _asyncEnumerator = null; #endif #if DEBUG_PERFORMANCE Stopwatch _moveNextSw = new Stopwatch(); @@ -332,7 +332,7 @@ class KafkaEnumerator : IEnumerator Stopwatch _valueBufferSw = new Stopwatch(); #endif - public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, KeyValueIterator? keyValueIterator, bool useEnumerator, bool isAsync) + public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, KeyValueIterator? keyValueIterator, bool useEnumerator, bool isAsync) { _kafkaCluster = kafkaCluster ?? throw new ArgumentNullException(nameof(kafkaCluster)); _entityType = entityType; @@ -402,7 +402,7 @@ public bool MoveNext() _cycles++; _valueGetSw.Start(); #endif - KeyValue kv = _useEnumeratorWithPrefetch ? _enumerator.Current : _keyValueIterator.Next; + KeyValue kv = _useEnumeratorWithPrefetch ? _enumerator.Current : _keyValueIterator.Next; #if DEBUG_PERFORMANCE _valueGetSw.Stop(); _valueGet2Sw.Start(); @@ -452,7 +452,7 @@ public ValueTask MoveNextAsync() _cycles++; _valueGetSw.Start(); #endif - KeyValue kv = _asyncEnumerator.Current; + KeyValue kv = _asyncEnumerator.Current; #if DEBUG_PERFORMANCE _valueGetSw.Stop(); _valueGet2Sw.Start(); diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs index 614e9c47..816a68d8 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs @@ -71,8 +71,8 @@ struct StreamsAssociatedData(KeyValueBytesStoreSupplier storeSupplier, Materiali private readonly IKafkaCluster _kafkaCluster; private readonly IEntityType _entityType; - private readonly ISerDes _keySerdes; - private readonly ISerDes _valueSerdes; + private readonly ISerDes _keySerdes; + private readonly ISerDes _valueSerdes; private readonly bool _usePersistentStorage; private readonly string _topicName; @@ -81,7 +81,7 @@ struct StreamsAssociatedData(KeyValueBytesStoreSupplier storeSupplier, Materiali /// /// Default initializer /// - public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes, StreamsBuilder builder) + public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes, StreamsBuilder builder) { _kafkaCluster = kafkaCluster; _entityType = entityType; @@ -246,11 +246,11 @@ class KafkaEnumberable : IEnumerable { private readonly IKafkaCluster _kafkaCluster; private readonly IEntityType _entityType; - private readonly ISerDes _keySerdes; - private readonly ISerDes _valueSerdes; + private readonly ISerDes _keySerdes; + private readonly ISerDes _valueSerdes; private readonly Org.Apache.Kafka.Streams.State.ReadOnlyKeyValueStore? _keyValueStore = null; - public KafkaEnumberable(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes, string storageId) + public KafkaEnumberable(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes, string storageId) { _kafkaCluster = kafkaCluster; _entityType = entityType; @@ -282,8 +282,8 @@ class KafkaEnumerator : IEnumerator { private readonly IKafkaCluster _kafkaCluster; private readonly IEntityType _entityType; - private readonly ISerDes _keySerdes; - private readonly ISerDes _valueSerdes; + private readonly ISerDes _keySerdes; + private readonly ISerDes _valueSerdes; private readonly Org.Apache.Kafka.Streams.State.KeyValueIterator? _keyValueIterator = null; Stopwatch _valueGet = new Stopwatch(); @@ -296,7 +296,7 @@ class KafkaEnumerator : IEnumerator Stopwatch _valueBufferSw = new Stopwatch(); #endif - public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes, Org.Apache.Kafka.Streams.State.KeyValueIterator? keyValueIterator) + public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes, Org.Apache.Kafka.Streams.State.KeyValueIterator? keyValueIterator) { _kafkaCluster = kafkaCluster ?? throw new ArgumentNullException(nameof(kafkaCluster)); _entityType = entityType; @@ -357,16 +357,16 @@ public bool MoveNext() _cycles++; _valueGetSw.Start(); #endif - byte[]? data; + V? data; using (KeyValue kv = _keyValueIterator.Next) { - data = kv.value as byte[]; + data = kv.value != null ? (V)(object)kv.value! : default; } #if DEBUG_PERFORMANCE _valueGetSw.Stop(); _valueSerdesSw.Start(); #endif - TValue entityTypeData = _valueSerdes.DeserializeWithHeaders(null, null, data); + TValue entityTypeData = _valueSerdes.DeserializeWithHeaders(null, null, data!); #if DEBUG_PERFORMANCE _valueSerdesSw.Stop(); _valueBufferSw.Start(); diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs index fd4a9d9e..3ac9a18e 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs @@ -33,8 +33,8 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// /// Default initializer /// -public sealed class KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes, StreamsBuilder builder) - : KafkaStreamsBaseRetriever(kafkaCluster, entityType, keySerdes, valueSerdes, builder) +public sealed class KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes, StreamsBuilder builder) + : KafkaStreamsBaseRetriever(kafkaCluster, entityType, keySerdes, valueSerdes, builder) where TKey :notnull where TValueContainer : IValueContainer { @@ -43,7 +43,7 @@ public sealed class KafkaStreamsTableRetriever(IKafkaClus /// /// Initializer /// - public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes) + public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, ISerDes keySerdes, ISerDes valueSerdes) : this(kafkaCluster, entityType, keySerdes, valueSerdes, new StreamsBuilder()) { } diff --git a/src/net/KEFCore/Storage/Internal/KafkaTable.cs b/src/net/KEFCore/Storage/Internal/KafkaTable.cs index 7880f0a7..698a173d 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTable.cs @@ -33,7 +33,7 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// -public class KafkaTable : IKafkaTable +public class KafkaTable : IKafkaTable where TKey : notnull where TValueContainer : class, IValueContainer where TKeySerializer : class, new() @@ -62,7 +62,7 @@ public KafkaTable( Cluster = cluster; EntityType = entityType; _tableAssociatedTopicName = Cluster.CreateTable(entityType); - _producer = EntityTypeProducers.Create(entityType, Cluster); + _producer = EntityTypeProducers.Create(entityType, Cluster); _keyValueFactory = entityType.FindPrimaryKey()!.GetPrincipalKeyValueFactory(); _sensitiveLoggingEnabled = sensitiveLoggingEnabled; _rows = new Dictionary(_keyValueFactory.EqualityComparer); diff --git a/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs b/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs index b1d12281..9b9eb347 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs @@ -56,12 +56,14 @@ private Func CreateTable(IKafkaCluster cluster, IEntityType entityT => (Func)typeof(KafkaTableFactory).GetTypeInfo() .GetDeclaredMethod(nameof(CreateFactory))! .MakeGenericMethod(entityType.FindPrimaryKey()!.GetKeyType(), - _options.ValueContainerType(entityType), + _options.ValueContainerType(entityType), + _options.JVMKeyType(entityType), + _options.JVMValueContainerType(entityType), _options.SerializerTypeForKey(entityType), _options.SerializerTypeForValue(entityType)) .Invoke(null, new object?[] { cluster, entityType, _sensitiveLoggingEnabled })!; - private static Func CreateFactory( + private static Func CreateFactory( IKafkaCluster cluster, IEntityType entityType, bool sensitiveLoggingEnabled) @@ -69,5 +71,5 @@ private static Func CreateFactory where TKeySerializer : class, new() where TValueSerializer : class, new() - => () => new KafkaTable(cluster, entityType, sensitiveLoggingEnabled); + => () => new KafkaTable(cluster, entityType, sensitiveLoggingEnabled); } diff --git a/test/Common/Common.props b/test/Common/Common.props index 3e0335ef..0ad1838e 100644 --- a/test/Common/Common.props +++ b/test/Common/Common.props @@ -16,8 +16,8 @@ - - - + + + \ No newline at end of file diff --git a/test/KEFCore.Benchmark.Test/Program.cs b/test/KEFCore.Benchmark.Test/Program.cs index aa70bff8..f9b77f47 100644 --- a/test/KEFCore.Benchmark.Test/Program.cs +++ b/test/KEFCore.Benchmark.Test/Program.cs @@ -22,18 +22,15 @@ * SOFTWARE. */ -using Java.Sql; using MASES.EntityFrameworkCore.KNet.Infrastructure; using MASES.EntityFrameworkCore.KNet.Serialization.Avro; using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage; -using MASES.EntityFrameworkCore.KNet.Serialization.Json.Storage; using MASES.EntityFrameworkCore.KNet.Serialization.Protobuf; using MASES.EntityFrameworkCore.KNet.Serialization.Protobuf.Storage; using MASES.EntityFrameworkCore.KNet.Test.Common; using MASES.EntityFrameworkCore.KNet.Test.Model; using MASES.KNet.Streams; using Microsoft.EntityFrameworkCore; -using Org.Apache.Kafka.Common.Metrics.Stats; using System; using System.Collections.Generic; using System.Diagnostics; @@ -114,15 +111,15 @@ static void Main(string[] args) { if (config.UseProtobuf) { - context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.Binary<>); + context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.BinaryRaw<>); context.ValueContainerType = typeof(ProtobufValueContainer<>); - context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.Binary<>); + context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.BinaryRaw<>); } else if (config.UseAvro) { - context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.Binary<>) : typeof(AvroKEFCoreSerDes.Key.Json<>); + context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.Key.JsonRaw<>); context.ValueContainerType = typeof(AvroValueContainer<>); - context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.Binary<>) : typeof(AvroKEFCoreSerDes.ValueContainer.Json<>); + context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.ValueContainer.JsonRaw<>); } if (config.DeleteApplicationData) @@ -183,15 +180,15 @@ static void Main(string[] args) { if (config.UseProtobuf) { - context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.Binary<>); + context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.BinaryRaw<>); context.ValueContainerType = typeof(ProtobufValueContainer<>); - context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.Binary<>); + context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.BinaryRaw<>); } else if (config.UseAvro) { - context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.Binary<>) : typeof(AvroKEFCoreSerDes.Key.Json<>); + context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.Key.JsonRaw<>); context.ValueContainerType = typeof(AvroValueContainer<>); - context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.Binary<>) : typeof(AvroKEFCoreSerDes.ValueContainer.Json<>); + context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.ValueContainer.JsonRaw<>); } Stopwatch watch = new(); diff --git a/test/KEFCore.Complex.Test/Program.cs b/test/KEFCore.Complex.Test/Program.cs index 0a2f05b4..dcfa57de 100644 --- a/test/KEFCore.Complex.Test/Program.cs +++ b/test/KEFCore.Complex.Test/Program.cs @@ -95,15 +95,15 @@ static void Main(string[] args) if (config.UseProtobuf) { - context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.Binary<>); + context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.BinaryRaw<>); context.ValueContainerType = typeof(ProtobufValueContainer<>); - context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.Binary<>); + context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.BinaryRaw<>); } else if (config.UseAvro) { - context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.Binary<>) : typeof(AvroKEFCoreSerDes.Key.Json<>); + context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.Key.JsonRaw<>); context.ValueContainerType = typeof(AvroValueContainer<>); - context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.Binary<>) : typeof(AvroKEFCoreSerDes.ValueContainer.Json<>); + context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.ValueContainer.JsonRaw<>); } if (config.DeleteApplicationData) diff --git a/test/KEFCore.StreamTest/Program.cs b/test/KEFCore.StreamTest/Program.cs index 814ebe9f..9f8cda2d 100644 --- a/test/KEFCore.StreamTest/Program.cs +++ b/test/KEFCore.StreamTest/Program.cs @@ -27,7 +27,6 @@ using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage; using MASES.EntityFrameworkCore.KNet.Serialization.Protobuf; using MASES.EntityFrameworkCore.KNet.Serialization.Protobuf.Storage; -using MASES.EntityFrameworkCore.KNet.Storage; using MASES.EntityFrameworkCore.KNet.Test.Common; using MASES.EntityFrameworkCore.KNet.Test.Model; using MASES.KNet.Streams; @@ -95,15 +94,15 @@ static void Main(string[] args) if (config.UseProtobuf) { - context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.Binary<>); + context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.BinaryRaw<>); context.ValueContainerType = typeof(ProtobufValueContainer<>); - context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.Binary<>); + context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.BinaryRaw<>); } else if (config.UseAvro) { - context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.Binary<>) : typeof(AvroKEFCoreSerDes.Key.Json<>); + context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.Key.JsonRaw<>); context.ValueContainerType = typeof(AvroValueContainer<>); - context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.Binary<>) : typeof(AvroKEFCoreSerDes.ValueContainer.Json<>); + context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.ValueContainer.JsonRaw<>); } if (config.DeleteApplicationData) diff --git a/test/KEFCore.Test/Program.cs b/test/KEFCore.Test/Program.cs index 3531e36d..cf59c7e0 100644 --- a/test/KEFCore.Test/Program.cs +++ b/test/KEFCore.Test/Program.cs @@ -97,15 +97,15 @@ static void Main(string[] args) if (config.UseProtobuf) { - context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.Binary<>); + context.KeySerializationType = typeof(ProtobufKEFCoreSerDes.Key.BinaryRaw<>); context.ValueContainerType = typeof(ProtobufValueContainer<>); - context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.Binary<>); + context.ValueSerializationType = typeof(ProtobufKEFCoreSerDes.ValueContainer.BinaryRaw<>); } else if (config.UseAvro) { - context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.Binary<>) : typeof(AvroKEFCoreSerDes.Key.Json<>); + context.KeySerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.Key.JsonRaw<>); context.ValueContainerType = typeof(AvroValueContainer<>); - context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.Binary<>) : typeof(AvroKEFCoreSerDes.ValueContainer.Json<>); + context.ValueSerializationType = config.UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.ValueContainer.JsonRaw<>); } if (config.DeleteApplicationData)