Skip to content

Commit

Permalink
.NET serialization is compliant with Apache Kafka (#464)
Browse files Browse the repository at this point in the history
* If .NET serialization is in use (UseKafkaClassForSupportedTypes is false), it shall be byte reversed to compliant with Apache Kafka serializers

* Distinguish between types to decide about revert array, update test code
  • Loading branch information
masesdevelopers authored May 21, 2024
1 parent 0e25e82 commit 30c958f
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
branches: '**'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

# This workflow contains two jobs called "check_changes", "build_windows"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ on:
# - cron: '26 23 * * 3'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/documentation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ on:
type: boolean

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

# This workflow contains one job called "build_documentation"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/generateclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ on:
type: boolean

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

# This workflow contains one job called "build_documentation"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pullrequest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
branches: [ master ]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
Expand Down
4 changes: 2 additions & 2 deletions src/documentation/articles/usageSerDes.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ _description: Describes how to use serialization of .NET suite for Apache Kafka

# KNet: Serializer/Deserializer

KNet comes with a base set of serializer and deserializer. Most of them are usable with primitives types (bool, int, etc) and array of bytes.
KNet comes with a base set of serializer and deserializer. Most of them are usable with primitives types (`bool`, `int`, etc) and array of `byte`s.

If the user wants to use structures types there are two ways:
1. Creates types in Java and reflects them in C#
Expand All @@ -17,7 +17,7 @@ The current available packages are:
- [MASES.KNet.Serialization.Avro](https://www.nuget.org/packages/MASES.KNet.Serialization.Avro/): it is a serdes which uses [AVRO](https://en.wikipedia.org/wiki/Apache_Avro); till now is not ready.
- [MASES.KNet.Serialization.Json](https://www.nuget.org/packages/MASES.KNet.Serialization.Json/): it is a serdes which uses [Json](https://en.wikipedia.org/wiki/JSON); till now it is at its first stage and it is based on general purpose API from:
- .NET Framework: it uses [Newtonsoft.Json](https://www.nuget.org/packages/Newtonsoft.Json) package
- .NET 6/7: it uses the Json which comes with the frameworks
- .NET 6/8: it uses the Json which comes with the frameworks
- [MASES.KNet.Serialization.MessagePack](https://www.nuget.org/packages/MASES.KNet.Serialization.MessagePack/): it is a serdes which uses [MessagePack](https://en.wikipedia.org/wiki/MessagePack); till now it is at its first stage and it is based on general purpose API from [MessagePack](https://www.nuget.org/packages/MessagePack) package
- [MASES.KNet.Serialization.Protobuf](https://www.nuget.org/packages/MASES.KNet.Serialization.Protobuf/): it is a serdes which uses [Google.Protobuf](https://en.wikipedia.org/wiki/Protocol_Buffers); till now it is at its first stage and it is based on general purpose API from [Google.Protobuf](https://www.nuget.org/packages/Google.Protobuf) package

Expand Down
34 changes: 29 additions & 5 deletions src/net/KNet/Specific/Serialization/KNetSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using Org.Apache.Kafka.Common.Serialization;
using Org.Apache.Kafka.Common.Utils;
using System;
using System.Linq;
using System.Reflection;
using System.Text;

Expand Down Expand Up @@ -277,7 +278,9 @@ public static byte[] SerializeBytes(bool fallbackToKafka, string topic, Bytes da
public static byte[] SerializeDouble(bool fallbackToKafka, string topic, double data)
{
if (fallbackToKafka) return _DoubleSerializer.Serialize(topic, data);
return BitConverter.GetBytes(data);
var array = BitConverter.GetBytes(data);
if (ShallRevertByteOrderDouble) Array.Reverse(array);
return array;
}

static readonly Org.Apache.Kafka.Common.Serialization.FloatSerializer _FloatSerializer = new Org.Apache.Kafka.Common.Serialization. FloatSerializer();
Expand All @@ -287,7 +290,9 @@ public static byte[] SerializeDouble(bool fallbackToKafka, string topic, double
public static byte[] SerializeFloat(bool fallbackToKafka, string topic, float data)
{
if (fallbackToKafka) return _FloatSerializer.Serialize(topic, data);
return BitConverter.GetBytes(data);
var array = BitConverter.GetBytes(data);
if (ShallRevertByteOrderFloat) Array.Reverse(array);
return array;
}

static readonly Org.Apache.Kafka.Common.Serialization.IntegerSerializer _IntSerializer = new Org.Apache.Kafka.Common.Serialization.IntegerSerializer();
Expand All @@ -297,7 +302,9 @@ public static byte[] SerializeFloat(bool fallbackToKafka, string topic, float da
public static byte[] SerializeInt(bool fallbackToKafka, string topic, int data)
{
if (fallbackToKafka) return _IntSerializer.Serialize(topic, data);
return BitConverter.GetBytes(data);
var array = BitConverter.GetBytes(data);
if (ShallRevertByteOrderInt) Array.Reverse(array);
return array;

// the following generates an error in container
//return new byte[] { (byte)(data >>> 24), (byte)(data >>> 16), (byte)(data >>> 8), ((byte)data) };
Expand All @@ -310,7 +317,10 @@ public static byte[] SerializeInt(bool fallbackToKafka, string topic, int data)
public static byte[] SerializeLong(bool fallbackToKafka, string topic, long data)
{
if (fallbackToKafka) return _LongSerializer.Serialize(topic, data);
return BitConverter.GetBytes(data);
var array = BitConverter.GetBytes(data);
if (ShallRevertByteOrderLong) Array.Reverse(array);
return array;

// the following generates an error in container
//return new byte[] { (byte)((int)(data >>> 56)), (byte)((int)(data >>> 48)), (byte)((int)(data >>> 40)), (byte)((int)(data >>> 32)), (byte)((int)(data >>> 24)), (byte)((int)(data >>> 16)), (byte)((int)(data >>> 8)), ((byte)data) };
}
Expand All @@ -322,7 +332,10 @@ public static byte[] SerializeLong(bool fallbackToKafka, string topic, long data
public static byte[] SerializeShort(bool fallbackToKafka, string topic, short data)
{
if (fallbackToKafka) return _ShortSerializer.Serialize(topic, data);
return BitConverter.GetBytes(data);
var array = BitConverter.GetBytes(data);
if (ShallRevertByteOrderShort) Array.Reverse(array);
return array;

// the following generates an error in container
//return new byte[] { (byte)(data >>> 8), ((byte)data) };
}
Expand Down Expand Up @@ -411,6 +424,7 @@ public static double DeserializeDouble(bool fallbackToKafka, string topic, byte[
}
return (double)result;
}
if (ShallRevertByteOrderDouble) Array.Reverse(data);
return BitConverter.ToDouble(data, 0);
}

Expand All @@ -430,6 +444,7 @@ public static float DeserializeFloat(bool fallbackToKafka, string topic, byte[]
}
return (float)result;
}
if (ShallRevertByteOrderFloat) Array.Reverse(data);
return BitConverter.ToSingle(data, 0);
}

Expand All @@ -449,6 +464,7 @@ public static int DeserializeInt(bool fallbackToKafka, string topic, byte[] data
}
return (int)result;
}
if (ShallRevertByteOrderInt) Array.Reverse(data);
return BitConverter.ToInt32(data, 0);

//if (data == null)
Expand Down Expand Up @@ -493,6 +509,7 @@ public static long DeserializeLong(bool fallbackToKafka, string topic, byte[] da
}
return (long)result;
}
if (ShallRevertByteOrderLong) Array.Reverse(data);
return BitConverter.ToInt64(data, 0);

//if (data == null)
Expand Down Expand Up @@ -537,6 +554,7 @@ public static short DeserializeShort(bool fallbackToKafka, string topic, byte[]
}
return (short)result;
}
if (ShallRevertByteOrderShort) Array.Reverse(data);
return BitConverter.ToInt16(data, 0);

//if (data == null)
Expand Down Expand Up @@ -595,5 +613,11 @@ public static Java.Lang.Void DeserializeVoid(bool fallbackToKafka, string topic,
return null;
}
}

static readonly bool ShallRevertByteOrderShort = !SerializeShort(false, "", 1).SequenceEqual(SerializeShort(true, "", 1));
static readonly bool ShallRevertByteOrderInt = !SerializeInt(false, "", 1).SequenceEqual(SerializeInt(true, "", 1));
static readonly bool ShallRevertByteOrderLong = !SerializeLong(false, "", 1).SequenceEqual(SerializeLong(true, "", 1));
static readonly bool ShallRevertByteOrderFloat = !SerializeFloat(false, "", 1.1F).SequenceEqual(SerializeFloat(true, "", 1.1F));
static readonly bool ShallRevertByteOrderDouble = !SerializeDouble(false, "", 1.1).SequenceEqual(SerializeDouble(true, "", 1.1));
}
}
2 changes: 1 addition & 1 deletion src/net/KNet/Specific/Serialization/SerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public interface ISerDes : IDisposable
/// <summary>
/// Set to <see langword="true"/> in implementing class if the implementation uses the support of direct buffer data exchange
/// </summary>
/// <remarks>If set to <see langword="true"/>, the KNet classes will use <see cref="KNetByteBufferSerializer"/> and <see cref="KNetByteBufferDeserializer"/> as backing JVM classes</remarks>
/// <remarks>If set to <see langword="true"/>, the KNet classes will use <see cref="ByteBufferSerializer"/> and <see cref="ByteBufferDeserializer"/> as backing JVM classes</remarks>
bool IsDirectBuffered { get; }
}

Expand Down
7 changes: 7 additions & 0 deletions tests/net/KNetTest.sln
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Test Streams", "Test Stream
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KNetTestKNetStreams", "KNetTestKNetStreams\KNetTestKNetStreams.csproj", "{69DF3123-32D8-4583-9D76-A61A78DFF370}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KNetTestSerDes", "KNetTestSerDes\KNetTestSerDes.csproj", "{CED323B6-506B-470C-9F26-0B77667C8598}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -150,6 +152,10 @@ Global
{69DF3123-32D8-4583-9D76-A61A78DFF370}.Debug|Any CPU.Build.0 = Debug|Any CPU
{69DF3123-32D8-4583-9D76-A61A78DFF370}.Release|Any CPU.ActiveCfg = Release|Any CPU
{69DF3123-32D8-4583-9D76-A61A78DFF370}.Release|Any CPU.Build.0 = Release|Any CPU
{CED323B6-506B-470C-9F26-0B77667C8598}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CED323B6-506B-470C-9F26-0B77667C8598}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CED323B6-506B-470C-9F26-0B77667C8598}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CED323B6-506B-470C-9F26-0B77667C8598}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -176,6 +182,7 @@ Global
{363CC373-CF04-4204-A44A-C96AB1B1085E} = {68128D24-3F72-4A25-A876-509F12F38E73}
{8E8EB519-70A3-487A-BB71-096988CCA6A9} = {68128D24-3F72-4A25-A876-509F12F38E73}
{69DF3123-32D8-4583-9D76-A61A78DFF370} = {35AE6AB6-C68A-4605-9C66-EE652977C27A}
{CED323B6-506B-470C-9F26-0B77667C8598} = {A77F1B9E-5576-46F0-BE58-C85FA9FA3A58}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0A7C16DC-1BAA-44BC-AA1C-D40B7B61878E}
Expand Down
13 changes: 13 additions & 0 deletions tests/net/KNetTestSerDes/KNetTestSerDes.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\Common\Common.props" />
<PropertyGroup>
<AssemblyName>KNetTestSerDes</AssemblyName>
<OutputType>Exe</OutputType>
<RootNamespace>MASES.KNetTestSerDes</RootNamespace>
<Title>KNetTestSerDes - a test tool for KNet</Title>
<Description>KNetTestSerDes - a test tool for KNet</Description>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\Common\SharedKNetCore.cs" Link="SharedKNetCore.cs" />
</ItemGroup>
</Project>
120 changes: 120 additions & 0 deletions tests/net/KNetTestSerDes/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2024 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.KNet.Serialization;
using MASES.KNet.TestCommon;
using System.Linq;

namespace MASES.KNetTestAdmin
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopicAdmin";

static string serverToUse = theServer;
static string topicToUse = theTopic;

static void Main(string[] args)
{
SharedKNetCore.CreateGlobalInstance();
var appArgs = SharedKNetCore.FilteredArgs;

byte[] bb, bb1;

bb = KNetSerialization.SerializeBoolean(false, "test", false);
bb1 = KNetSerialization.SerializeBoolean(true, "test", false);
if (!bb.SequenceEqual(bb1)) throw new System.Exception();

if (KNetSerialization.DeserializeBoolean(true, "test", bb) != false) throw new System.Exception();

bb = KNetSerialization.SerializeBoolean(false, "test", true);
bb1 = KNetSerialization.SerializeBoolean(true, "test", true);
if (!bb.SequenceEqual(bb1)) throw new System.Exception();

if (KNetSerialization.DeserializeBoolean(true, "test", bb) != true) throw new System.Exception();

const int cycles = 100;

long cycleDelta = short.MaxValue / cycles;

for (short i = 0; i < cycles; i++)
{
short val = (short)(short.MinValue + i * (short)cycleDelta);

bb = KNetSerialization.SerializeShort(false, "test", val);
bb1 = KNetSerialization.SerializeShort(true, "test", val);
if (!bb.SequenceEqual(bb1)) throw new System.Exception();

if (KNetSerialization.DeserializeShort(true, "test", bb) != val) throw new System.Exception();
}

cycleDelta = int.MaxValue / cycles;

for (int i = 0; i < cycles; i++)
{
int val = int.MinValue + i * (int)cycleDelta;

bb = KNetSerialization.SerializeInt(false, "test", val);
bb1 = KNetSerialization.SerializeInt(true, "test", val);
if (!bb.SequenceEqual(bb1)) throw new System.Exception();

if (KNetSerialization.DeserializeInt(true, "test", bb) != val) throw new System.Exception();
}

cycleDelta = long.MaxValue / cycles;

for (long i = 0; i < cycles; i++)
{
long val = long.MinValue + i * (long)cycleDelta;

bb = KNetSerialization.SerializeLong(false, "test", val);
bb1 = KNetSerialization.SerializeLong(true, "test", val);
if (!bb.SequenceEqual(bb1)) throw new System.Exception();

if (KNetSerialization.DeserializeLong(true, "test", bb) != val) throw new System.Exception();
}

float cycleDeltaF = float.MaxValue / cycles;

for (long i = 0; i < cycles; i++)
{
float val = float.MinValue + i * cycleDeltaF;

bb = KNetSerialization.SerializeFloat(false, "test", val);
bb1 = KNetSerialization.SerializeFloat(true, "test", val);
if (!bb.SequenceEqual(bb1)) throw new System.Exception();

if (KNetSerialization.DeserializeFloat(true, "test", bb) != val) throw new System.Exception();
}

double cycleDeltaD = double.MaxValue / cycles;

for (long i = 0; i < cycles; i++)
{
double val = double.MinValue + i * cycleDeltaD;

bb = KNetSerialization.SerializeDouble(false, "test", val);
bb1 = KNetSerialization.SerializeDouble(true, "test", val);
if (!bb.SequenceEqual(bb1)) throw new System.Exception();

if (KNetSerialization.DeserializeDouble(true, "test", bb) != val) throw new System.Exception();
}
}
}
}

0 comments on commit 30c958f

Please sign in to comment.