Skip to content

Commit

Permalink
Added and managed ISerDesSelector<T> (breaking change) (#466)
Browse files Browse the repository at this point in the history
* Added and managed ISerDesSelector<T> to retrieve serialization classes based on the JVM type, all serialization extension where updated to reports the classes extending ISerDesSelector<T> without any reference to the data transfer mechanism

* Doc update

* Added new properties and methods to ISerDes and ISerDesSelector

* Update description
  • Loading branch information
masesdevelopers authored May 25, 2024
1 parent 30c958f commit ad9cb07
Show file tree
Hide file tree
Showing 14 changed files with 1,767 additions and 1,189 deletions.
17 changes: 9 additions & 8 deletions src/documentation/articles/usageSerDes.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,29 @@ SerDesRaw<TestType> deserializer = new SerDesRaw<TestType>()
Otherwise the user can use a ready made class like in the following snippet:

```c#
ISerDesRaw<TestType> serdes = new JsonSerDes.ValueRaw<TestType>();
ISerDesRaw<TestType> serdes = JsonSerDes.Value<TestType>.NewByteArraySerDes();
```

A single `JsonSerDes.ValueRaw` can be used in serialization and deserialization, and produce Json serialized data.

## Key and Value versions

The reader noticed that in the example was used `JsonSerDes.ValueRaw`. It is a serializer/deserializer, based on `byte` array, generally used for values because it stores, within the record `Headers` information related to the value itself.
The reader noticed that in the example was used `JsonSerDes.Value<T>().NewByteArraySerDes()`. It is a serializer/deserializer, based on `byte` array, generally used for values because it stores, within the record `Headers` information related to the value itself.

All packages listed above have multiple types based on the scope and data exchange mechanism:
- [Serialization Format].KeyRaw: key serializer/deserializer based on `byte` array
- [Serialization Format].KeyBuffered: key serializer/deserializer based on `ByteBuffer`
- [Serialization Format].ValueRaw: value serializer/deserializer based on `byte` array
- [Serialization Format].ValueBuffered: value serializer/deserializer based on `ByteBuffer`
- [Serialization Format].Key: key serializer/deserializer can manages data transfer using both `byte` array and `ByteBuffer`
- [Serialization Format].Value: value serializer/deserializer can manages data transfer using both `byte` array and `ByteBuffer`

where [Serialization format] depends on the serializatin package in use.
where [Serialization format] depends on the serializatin package in use and the selection of the data transfer can be made from underlying code or can be requested from the user:
- `[Serialization Format].[Key or Value]<TData>.NewByteArraySerDes()`: returns an `ISerDesRaw<TData>`
- `[Serialization Format].[Key or Value]<TData>.NewByteBufferSerDes()`: returns an `ISerDesBuffered<TData>`

> [!TIP]
> As specified above, each serializer stores info within the `Headers` and this behavior is controlled from a property named `UseHeaders`.
> If the user writes a code like:
>
>```c#
> ISerDesRaw<TestType> serdes = new JsonSerDes.ValueRaw<TestType>();
> ISerDesRaw<TestType> serdes = JsonSerDes.Value<TestType>.NewByteArraySerDes();
> serdes.UseHeader = false;
>```
> The `ISerDesRaw<TestType>` instance does not writes the `Headers` and can be used both for key and value.
Expand Down
1,111 changes: 644 additions & 467 deletions src/net/KNet.Serialization.Avro/AvroSerDes.cs

Large diffs are not rendered by default.

614 changes: 351 additions & 263 deletions src/net/KNet.Serialization.Json/JsonSerDes.cs

Large diffs are not rendered by default.

410 changes: 249 additions & 161 deletions src/net/KNet.Serialization.MessagePack/MessagePackSerDes.cs

Large diffs are not rendered by default.

378 changes: 231 additions & 147 deletions src/net/KNet.Serialization.Protobuf/ProtobufSerDes.cs

Large diffs are not rendered by default.

18 changes: 2 additions & 16 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,27 +168,13 @@ static Properties CheckProperties(Properties props, ISerDes keyDeserializer, ISe
{
if (!props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
{
if (!keyDeserializer.IsDirectBuffered)
{
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ClassNameOf<Org.Apache.Kafka.Common.Serialization.ByteArrayDeserializer>());
}
else
{
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ClassNameOf<ByteBufferDeserializer>());
}
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.JVMDeserializerClassName);
}
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");

if (!props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
{
if (!valueDeserializer.IsDirectBuffered)
{
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClassNameOf<Org.Apache.Kafka.Common.Serialization.ByteArrayDeserializer>());
}
else
{
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClassNameOf<ByteBufferDeserializer>());
}
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.JVMDeserializerClassName);
}
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");

Expand Down
75 changes: 44 additions & 31 deletions src/net/KNet/Specific/GenericConfigBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public static T CreateFrom(T origin)
var newT = new T
{
_options = new System.Collections.Generic.Dictionary<string, object>(origin._options),
_KNetKeySerDes = origin._KNetKeySerDes,
_KNetValueSerDes = origin._KNetValueSerDes,
_KeySerDesSelector = origin._KeySerDesSelector,
_ValueSerDesSelector = origin._ValueSerDesSelector,
};
return newT;
}
Expand Down Expand Up @@ -112,8 +112,8 @@ protected virtual T Clone()
var clone = new T
{
_options = new System.Collections.Generic.Dictionary<string, object>(_options),
_KNetKeySerDes = _KNetKeySerDes,
_KNetValueSerDes = _KNetValueSerDes
_KeySerDesSelector = _KeySerDesSelector,
_ValueSerDesSelector = _ValueSerDesSelector
};
return clone;
}
Expand Down Expand Up @@ -150,60 +150,61 @@ public Properties ToProperties()
/// <inheritdoc cref="IGenericSerDesFactory.AutoSelectBuffered"/>
public bool AutoSelectBuffered { get; set; }

Type _KNetKeySerDes = null;
/// <inheritdoc cref="IGenericSerDesFactory.KNetKeySerDes"/>
public Type KNetKeySerDes
Type _KeySerDesSelector = null;
/// <inheritdoc cref="IGenericSerDesFactory.KeySerDesSelector"/>
public Type KeySerDesSelector
{
get { return _KNetKeySerDes; }
get { return _KeySerDesSelector; }
set
{
if (value.GetConstructors().Single(ci => ci.GetParameters().Length == 0) == null)
{
throw new ArgumentException($"{value.Name} does not contains a default constructor and cannot be used because it is not a valid Serializer type");
throw new ArgumentException($"{value.Name} does not contains a default constructor and cannot be used because it is not a valid ISerDesSelector type");
}

if (value.IsGenericType)
{
var keyT = value.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{value.Name} does not contains a single generic argument and cannot be used because it is not a valid Serializer type"); }
if (keyT.Length != 1) { throw new ArgumentException($"{value.Name} does not contains a single generic argument and cannot be used because it is not a valid ISerDesSelector type"); }
var t = value.GetGenericTypeDefinition();
if (t.GetInterface(typeof(ISerDes<,>).Name) == null)
if (t.GetInterface(typeof(ISerDesSelector<>).Name) == null)
{
throw new ArgumentException($"{value.Name} does not implement IKNetSerDes<> and cannot be used because it is not a valid Serializer type");
throw new ArgumentException($"{value.Name} does not implement ISerDesSelector<> and cannot be used because it is not a valid ISerDesSelector type");
}
_KNetKeySerDes = value;
_KeySerDesSelector = value;
}
else throw new ArgumentException($"{value.Name} is not a generic type and cannot be used as a valid ValueContainer type");
else throw new ArgumentException($"{value.Name} is not a generic type and cannot be used as a valid ISerDesSelector type");
}
}

Type _KNetValueSerDes = null;
/// <inheritdoc cref="IGenericSerDesFactory.KNetValueSerDes"/>
public Type KNetValueSerDes
Type _ValueSerDesSelector = null;
/// <inheritdoc cref="IGenericSerDesFactory.ValueSerDesSelector"/>
public Type ValueSerDesSelector
{
get { return _KNetValueSerDes; }
get { return _ValueSerDesSelector; }
set
{
if (value.GetConstructors().Single(ci => ci.GetParameters().Length == 0) == null)
{
throw new ArgumentException($"{value.Name} does not contains a default constructor and cannot be used because it is not a valid Serializer type");
throw new ArgumentException($"{value.Name} does not contains a default constructor and cannot be used because it is not a valid ISerDesSelector type");
}

if (value.IsGenericType)
{
var keyT = value.GetGenericArguments();
if (keyT.Length != 1) { throw new ArgumentException($"{value.Name} does not contains a single generic argument and cannot be used because it is not a valid Serializer type"); }
if (keyT.Length != 1) { throw new ArgumentException($"{value.Name} does not contains a single generic argument and cannot be used because it is not a valid ISerDesSelector type"); }
var t = value.GetGenericTypeDefinition();
if (t.GetInterface(typeof(ISerDes<,>).Name) == null)
if (t.GetInterface(typeof(ISerDesSelector<>).Name) == null)
{
throw new ArgumentException($"{value.Name} does not implement IKNetSerDes<> and cannot be used because it is not a valid Serializer type");
throw new ArgumentException($"{value.Name} does not implement ISerDesSelector<> and cannot be used because it is not a valid ISerDesSelector type");
}
_KNetValueSerDes = value;
_ValueSerDesSelector = value;
}
else throw new ArgumentException($"{value.Name} is not a generic type and cannot be used as a valid Serializer type");
else throw new ArgumentException($"{value.Name} is not a generic type and cannot be used as a valid ISerDesSelector type");
}
}

readonly ConcurrentDictionary<(Type, Type), ISerDesSelector> _keySerDesSelectorComplete = new();
readonly ConcurrentDictionary<(Type, Type), ISerDes> _keySerDesComplete = new();

/// <inheritdoc cref="IGenericSerDesFactory.BuildKeySerDes{TKey, TJVMTKey}"/>
Expand All @@ -226,17 +227,23 @@ public ISerDes<TKey, TJVMTKey> BuildKeySerDes<TKey, TJVMTKey>()
}
else
{
if (KNetKeySerDes == null) throw new InvalidOperationException($"No default serializer available for {typeof(TKey)}, property {nameof(KNetKeySerDes)} shall be set.");
var tmp = KNetKeySerDes.MakeGenericType(typeof(TKey));
serDes = Activator.CreateInstance(tmp) as ISerDes;
if (KeySerDesSelector == null) throw new InvalidOperationException($"No default serializer available for {typeof(TKey)}, property {nameof(KeySerDesSelector)} shall be set.");

var selector = _keySerDesSelectorComplete.GetOrAdd((KeySerDesSelector, typeof(TKey)), (o) =>
{
var selectorForValue = o.Item1.MakeGenericType(o.Item2);
return Activator.CreateInstance(selectorForValue) as ISerDesSelector;
}) as ISerDesSelector<TKey>;

serDes = selector.NewSerDes<TJVMTKey>();
}
_keySerDesComplete[(typeof(TKey), typeof(TJVMTKey))] = serDes;
}
return serDes as ISerDes<TKey, TJVMTKey>;
}
}


readonly ConcurrentDictionary<(Type, Type), ISerDesSelector> _valueSerDesSelectorComplete = new();
readonly ConcurrentDictionary<(Type, Type), ISerDes> _valueSerDesComplete = new();

/// <inheritdoc cref="IGenericSerDesFactory.BuildValueSerDes{TValue, TJVMTValue}"/>
Expand All @@ -259,9 +266,15 @@ public ISerDes<TValue, TJVMTValue> BuildValueSerDes<TValue, TJVMTValue>()
}
else
{
if (KNetValueSerDes == null) throw new InvalidOperationException($"No default serializer available for {typeof(TValue)}, property {nameof(KNetValueSerDes)} shall be set.");
var tmp = KNetValueSerDes.MakeGenericType(typeof(TValue));
serDes = Activator.CreateInstance(tmp) as ISerDes;
if (ValueSerDesSelector == null) throw new InvalidOperationException($"No default serializer available for {typeof(TValue)}, property {nameof(ValueSerDesSelector)} shall be set.");

var selector = _valueSerDesSelectorComplete.GetOrAdd((ValueSerDesSelector, typeof(TValue)), (o) =>
{
var selectorForValue = o.Item1.MakeGenericType(o.Item2);
return Activator.CreateInstance(selectorForValue) as ISerDesSelector;
}) as ISerDesSelector<TValue>;

serDes = selector.NewSerDes<TJVMTValue>();
}
_valueSerDesComplete[(typeof(TValue), typeof(TJVMTValue))] = serDes;
}
Expand Down
18 changes: 2 additions & 16 deletions src/net/KNet/Specific/Producer/KNetProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,27 +243,13 @@ static Properties CheckProperties(Properties props, ISerDes keySerializer, ISerD
{
if (!props.ContainsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
{
if (!keySerializer.IsDirectBuffered)
{
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ClassNameOf<Org.Apache.Kafka.Common.Serialization.ByteArraySerializer>());
}
else
{
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ClassNameOf<ByteBufferSerializer>());
}
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.JVMSerializerClassName);
}
else throw new InvalidOperationException($"KNetProducer auto manages configuration property {ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}, remove from configuration.");

if (!props.ContainsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
{
if (!valueSerializer.IsDirectBuffered)
{
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ClassNameOf<Org.Apache.Kafka.Common.Serialization.ByteArraySerializer>());
}
else
{
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ClassNameOf<ByteBufferSerializer>());
}
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.JVMSerializerClassName);
}
else throw new InvalidOperationException($"KNetProducer auto manages configuration property {ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}, remove from configuration.");

Expand Down
Loading

0 comments on commit ad9cb07

Please sign in to comment.