Skip to content

Commit

Permalink
Updates test execution to avoid failures (#258)
Browse files Browse the repository at this point in the history
* Use `SingleOrDefault` instead of `Single` to manage conditions where it is raised `System.InvalidOperationException: Sequence contains no elements`

* Split windows jobs since Kafka seems to crash on that OS if heavily loaded

* In case of timeout, especially in Windows, exit without report error because the error comes from external source

* Configuration in workflow folder and remove prefetch

* Upload dumps

* Added KafkaStreams test

* Update management of exceptions

* Added Raw and Buffered tests for KafkaStreams

* Use latest JDK for tests

* Code harmonization

* Adds tests with prefetch

* Fix #22 aligning key of headers to the JVM type

* Update tests moving out execution from Main to verify if some tests fails for the same reason reported in masesgroup/KNet#509
  • Loading branch information
masesdevelopers authored Jul 2, 2024
1 parent f976afb commit 3aba45f
Show file tree
Hide file tree
Showing 26 changed files with 361 additions and 131 deletions.
206 changes: 172 additions & 34 deletions .github/workflows/build.yaml

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions .github/workflows/configuration/Benchmark.KNetReplicator.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"DatabaseName": "TestDBBenchmark",
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10,
"UseEnumeratorWithPrefetch": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10,
"UseEnumeratorWithPrefetch": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"UseByteBufferDataTransfer": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"UseByteBufferDataTransfer": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10,
"UseEnumeratorWithPrefetch": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"UseKNetStreams": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"UseKNetStreams": false,
"UseByteBufferDataTransfer": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
32 changes: 16 additions & 16 deletions src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -199,8 +199,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -310,8 +310,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -377,8 +377,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -494,8 +494,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

using MemoryStream memStream = new();
BinaryEncoder encoder = new(memStream);
Expand Down Expand Up @@ -562,8 +562,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

MemoryStream memStream = new();
BinaryEncoder encoder = new(memStream);
Expand Down Expand Up @@ -668,8 +668,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

using MemoryStream memStream = new();
JsonEncoder encoder = new(AvroValueContainer._SCHEMA, memStream);
Expand Down Expand Up @@ -737,8 +737,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

MemoryStream memStream = new();
JsonEncoder encoder = new(AvroValueContainer._SCHEMA, memStream);
Expand Down
16 changes: 8 additions & 8 deletions src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
KeyContainer keyContainer = null!;
Expand Down Expand Up @@ -185,8 +185,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
KeyContainer keyContainer = null!;
Expand Down Expand Up @@ -298,8 +298,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

using (MemoryStream stream = new())
{
Expand Down Expand Up @@ -361,8 +361,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

MemoryStream stream = new();
data.WriteTo(stream);
Expand Down
16 changes: 8 additions & 8 deletions src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
var jsonStr = System.Text.Json.JsonSerializer.Serialize<TData>(data);
Expand Down Expand Up @@ -191,8 +191,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -301,8 +301,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<TData>(data, _options);
return Encoding.UTF8.GetBytes(jsonStr);
Expand Down Expand Up @@ -365,8 +365,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

var ms = new MemoryStream();
System.Text.Json.JsonSerializer.Serialize<TData>(ms, data, _options);
Expand Down
8 changes: 4 additions & 4 deletions src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,22 @@ public static object FromRecord(Org.Apache.Kafka.Clients.Consumer.ConsumerRecord
foreach (var header in headers.ToArray())
{
var key = header.Key();
if (key == KNetSerialization.KeyTypeIdentifier)
if (key == KNetSerialization.KeyTypeIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
keyType = Type.GetType(strType, true)!;
}
if (key == KNetSerialization.KeySerializerIdentifier)
if (key == KNetSerialization.KeySerializerIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
keySerializerSelectorType = Type.GetType(strType, true)!;
}
if (key == KNetSerialization.ValueTypeIdentifier)
if (key == KNetSerialization.ValueTypeIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
valueType = Type.GetType(strType, true)!;
}
if (key == KNetSerialization.ValueSerializerIdentifier)
if (key == KNetSerialization.ValueSerializerIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
valueSerializerSelectorType = Type.GetType(strType, true)!;
Expand Down
6 changes: 3 additions & 3 deletions src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs
Original file line number Diff line number Diff line change
Expand Up @@ -444,20 +444,20 @@ public ValueTask<bool> MoveNextAsync()
{
_moveNextSw.Start();
#endif
ValueTask<bool> hasNext = _asyncEnumerator.MoveNextAsync();
ValueTask<bool> hasNext = _asyncEnumerator == null ? new ValueTask<bool>(false) : _asyncEnumerator.MoveNextAsync();
hasNext.AsTask().Wait();
if (hasNext.Result)
{
#if DEBUG_PERFORMANCE
_cycles++;
_valueGetSw.Start();
#endif
KeyValue<TKey, TValue, TJVMKey, TJVMValue> kv = _asyncEnumerator.Current;
KeyValue<TKey, TValue, TJVMKey, TJVMValue>? kv = _asyncEnumerator?.Current;
#if DEBUG_PERFORMANCE
_valueGetSw.Stop();
_valueGet2Sw.Start();
#endif
TValue value = kv.Value;
TValue value = kv != null ? kv.Value : default;
#if DEBUG_PERFORMANCE
_valueGet2Sw.Stop();
_valueBufferSw.Start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityT
_builder ??= builder;
_topicName = _entityType.TopicName(kafkaCluster.Options);
_usePersistentStorage = _kafkaCluster.Options.UsePersistentStorage;
_properties ??= _kafkaCluster.Options.StreamsOptions(_kafkaCluster.Options.ApplicationId);
_properties ??= _kafkaCluster.Options.StreamsOptions(_entityType);

string storageId = _entityType.StorageIdForTable(_kafkaCluster.Options);
_storageId = _usePersistentStorage ? storageId : Process.GetCurrentProcess().ProcessName + "-" + storageId;
Expand Down
Loading

0 comments on commit 3aba45f

Please sign in to comment.