Skip to content

Commit

Permalink
Update to KNet 2.7.1 (#231)
Browse files Browse the repository at this point in the history
* Update Entity Framework version

* Updates based on latest KNet evolution

* Update and alignment to KNet 2.7.1

* Version upgrade

* #231 (comment)
#231 (comment)
#231 (comment)

* #231 (comment)
  • Loading branch information
masesdevelopers authored May 18, 2024
1 parent 19d3eed commit d0f1dd7
Show file tree
Hide file tree
Showing 24 changed files with 773 additions and 207 deletions.
55 changes: 33 additions & 22 deletions src/documentation/articles/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ The code is based on three elements shall be available to [Entity Framework Core
- **Key SerDes**: the serializer of the Primary Key
- **ValueContainer SerDes**: the serializer of the ValueContainer


### Default types

[Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/) comes with some default values:
- **ValueContainer** class: KEFCore uses `DefaultValueContainer<T>` (i.e. `DefaultKEFCoreSerDes.DefaultValueContainer`) which stores the CLR type of Entity, the properties ordered by their index with associated CLT type, name and JSON serializaed value; the class is marked for JSON serialization and it is used from the **ValueContainer SerDes**;
- **Key SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.Key.Json<T>` (i.e. `DefaultKEFCoreSerDes.DefaultKeySerialization`), the type automatically manages simple or complex Primary Key
- **ValueContainer SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.ValueContainer.Json<>` (i.e. `DefaultKEFCoreSerDes.DefaultValueContainerSerialization`)
- **Key SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.Key.JsonRaw<T>` (i.e. `DefaultKEFCoreSerDes.DefaultKeySerialization`), the type automatically manages simple or complex Primary Key
- **ValueContainer SerDes** class: KEFCore uses `DefaultKEFCoreSerDes.ValueContainer.JsonRaw<>` (i.e. `DefaultKEFCoreSerDes.DefaultValueContainerSerialization`)

Both Key and ValueContainer SerDes come with two kind of data transfer mechanisms:
- Raw: it uses `byte` arrays for data transfer
- Buffered: they use `ByteBuffer` for data transfer

### User override

Expand Down Expand Up @@ -138,25 +141,25 @@ public class CustomValueContainer<TKey> : IValueContainer<TKey> where TKey : not
#### **Key SerDes** and **ValueContainer SerDes** class

A custom **Key SerDes** class shall follow the following rules:
- must implements the `IKNetSerDes<T>` interface or extend `KNetSerDes<T>`
- must implements the `ISerDes<T>` interface or extend `SerDes<T>`
- must be a generic type
- must have a parameterless constructor
- can store serialization information using Headers of Apache Kafka record (this information will be used from `EntityExtractor`)

An example snippet is the follow based on JSON serializer:

```C#
public class CustomKeySerDes<T> : KNetSerDes<T>
public class CustomKeySerDes<T> : SerDesRaw<T>
{
readonly byte[] keyTypeName = Encoding.UTF8.GetBytes(typeof(T).FullName!);
readonly byte[] customSerDesName = Encoding.UTF8.GetBytes(typeof(CustomKeySerDes<>).FullName!);

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
/// <inheritdoc cref="SerDes{T, TJVM}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
/// <inheritdoc cref="SerDes{T, TJVM}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.KeyTypeIdentifier, keyTypeName);
Expand All @@ -165,12 +168,12 @@ public class CustomKeySerDes<T> : KNetSerDes<T>
var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
/// <inheritdoc cref="SerDes{T, TJVM}.Deserialize(string, TJVM)"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
/// <inheritdoc cref="SerDes{T, TJVM}.DeserializeWithHeaders(string, Headers, TJVM)"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (data == null) return default;
Expand All @@ -180,7 +183,7 @@ public class CustomKeySerDes<T> : KNetSerDes<T>
```

```C#
public class CustomValueContainerSerDes<T> : KNetSerDes<T>
public class CustomValueContainerSerDes<T> : SerDesRaw<T>
{
readonly byte[] valueContainerSerDesName = Encoding.UTF8.GetBytes(typeof(CustomValueContainerSerDes<>).FullName!);
readonly byte[] valueContainerName = null!;
Expand All @@ -205,12 +208,12 @@ public class CustomValueContainerSerDes<T> : KNetSerDes<T>
throw new ArgumentException($"{typeof(T).Name} is not a generic type and cannot be used as a valid ValueContainer type");
}

/// <inheritdoc cref="KNetSerDes{T}.Serialize(string, T)"/>
/// <inheritdoc cref="SerDes{T, TJVM}.Serialize(string, T)"/>
public override byte[] Serialize(string topic, T data)
{
return SerializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.SerializeWithHeaders(string, Headers, T)"/>
/// <inheritdoc cref="SerDes{T, TJVM}.SerializeWithHeaders(string, Headers, T)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, T data)
{
headers?.Add(KEFCoreSerDesNames.ValueContainerSerializerIdentifier, valueContainerSerDesName);
Expand All @@ -219,12 +222,12 @@ public class CustomValueContainerSerDes<T> : KNetSerDes<T>
var jsonStr = System.Text.Json.JsonSerializer.Serialize<T>(data);
return Encoding.UTF8.GetBytes(jsonStr);
}
/// <inheritdoc cref="KNetSerDes{T}.Deserialize(string, byte[])"/>
/// <inheritdoc cref="SerDes{T, TJVM}.Deserialize(string, TJVM)"/>
public override T Deserialize(string topic, byte[] data)
{
return DeserializeWithHeaders(topic, null, data);
}
/// <inheritdoc cref="KNetSerDes{T}.DeserializeWithHeaders(string, Headers, byte[])"/>
/// <inheritdoc cref="SerDes{T, TJVM}.DeserializeWithHeaders(string, Headers, TJVM)"/>
public override T DeserializeWithHeaders(string topic, Headers headers, byte[] data)
{
if (data == null) return default;
Expand All @@ -249,6 +252,10 @@ The engine comes with two different encoders
- Binary
- Json

Both Key and ValueContainer SerDes, Binary or Json, come with two kind of data transfer mechanisms:
- Raw: it uses `byte` arrays for data transfer
- Buffered: they use `ByteBuffer` for data transfer

### Avro schema

The following schema is the default used from the engine and can be registered in Apache Schema registry so other applications can use it to extract the data stored in the topics:
Expand Down Expand Up @@ -343,9 +350,9 @@ The extension converted this schema into code to speedup the exection of seriali
### How to use Avro

`KafkaDbContext` contains three properties can be used to override the default types:
- **KeySerializationType**: set this value to `AvroKEFCoreSerDes.Key.Binary<>` or `AvroKEFCoreSerDes.Key.Json<>` or use `AvroKEFCoreSerDes.DefaultKeySerialization` (defaults to `AvroKEFCoreSerDes.Key.Binary<>`), both types automatically manages simple or complex Primary Key
- **ValueSerializationType**: set this value to `AvroKEFCoreSerDes.ValueContainer.Binary<>` or `AvroKEFCoreSerDes.ValueContainer.Json<>` or use `AvroKEFCoreSerDes.DefaultValueContainerSerialization` (defaults to `AvroKEFCoreSerDes.ValueContainer.Binary<>`)
- **ValueContainerType**: set this value to `AvroValueContainer<>` or use `AvroKEFCoreSerDes.DefaultValueContainer`
- **KeySerializationType**: set this value to `AvroKEFCoreSerDes.Key.BinaryRaw<>` or `AvroKEFCoreSerDes.Key.JsonRaw<>` or use `AvroKEFCoreSerDes.DefaultKeySerialization` (defaults to `AvroKEFCoreSerDes.Key.BinaryRaw<>`), both types automatically manages simple or complex Primary Key
- **ValueSerializationType**: set this value to `AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>` or `AvroKEFCoreSerDes.ValueContainer.JsonRaw<>` or use `AvroKEFCoreSerDes.DefaultValueContainerSerialization` (defaults to `AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>`)
- **ValueContainerType**: set this value to `AvroValueContainerRaw<>` or use `AvroKEFCoreSerDes.DefaultValueContainer`

An example is:

Expand All @@ -355,9 +362,9 @@ using (context = new BloggingContext()
BootstrapServers = "KAFKA-SERVER:9092",
ApplicationId = "MyAppid",
DbName = "MyDBName",
KeySerializationType = UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.Binary<>) : typeof(AvroKEFCoreSerDes.Key.Json<>),
KeySerializationType = UseAvroBinary ? typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.Key.JsonRaw<>),
ValueContainerType = typeof(AvroValueContainer<>),
ValueSerializationType = UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.Binary<>) : typeof(AvroKEFCoreSerDes.ValueContainer.Json<>),
ValueSerializationType = UseAvroBinary ? typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>) : typeof(AvroKEFCoreSerDes.ValueContainer.JsonRaw<>),
})
{
// execute stuff here
Expand All @@ -368,6 +375,10 @@ using (context = new BloggingContext()

With package [MASES.EntityFrameworkCore.KNet.Serialization.Protobuf](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet.Serialization.Protobuf/) an user can choose the Protobuf serializer.

Both Key and ValueContainer SerDes come with two kind of data transfer mechanisms:
- Raw: it uses `byte` arrays for data transfer
- Buffered: they use `ByteBuffer` for data transfer

### Protobuf schema

The following schema is the default used from the engine and can be registered in Apache Schema registry so other applications can use it to extract the data stored in the topics:
Expand Down Expand Up @@ -499,9 +510,9 @@ The extension converted this schema into code to speedup the exection of seriali
### How to use Protobuf

`KafkaDbContext` contains three properties can be used to override the default types:
- **KeySerializationType**: set this value to `ProtobufKEFCoreSerDes..Key.Binary<>` or use `ProtobufKEFCoreSerDes.DefaultKeySerialization`, the type automatically manages simple or complex Primary Key
- **ValueSerializationType**: set this value to `ProtobufKEFCoreSerDes.ValueContainer.Binary<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainerSerialization`
- **ValueContainerType**: set this value to `ProtobufValueContainer<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainer`
- **KeySerializationType**: set this value to `ProtobufKEFCoreSerDes..Key.BinaryRaw<>` or use `ProtobufKEFCoreSerDes.DefaultKeySerialization`, the type automatically manages simple or complex Primary Key
- **ValueSerializationType**: set this value to `ProtobufKEFCoreSerDes.ValueContainer.BinaryRaw<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainerSerialization`
- **ValueContainerType**: set this value to `ProtobufValueContainerRaw<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainer`

An example is:

Expand Down
2 changes: 1 addition & 1 deletion src/net/Common/Common.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Owners>MASES s.r.l.</Owners>
<Authors>MASES s.r.l.</Authors>
<Company>MASES s.r.l.</Company>
<Version>2.1.1.0</Version>
<Version>2.2.0.0</Version>
<TargetFrameworks>net6.0;net7.0;net8.0</TargetFrameworks>
<LangVersion>latest</LangVersion>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet.Serialization.Avro" Version="2.6.1" />
<PackageReference Include="MASES.KNet.Serialization.Avro" Version="2.7.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit d0f1dd7

Please sign in to comment.