From 3aba45fa6f1ec1983577bbaa39daeb6ec851b715 Mon Sep 17 00:00:00 2001
From: MASES Public Developers Team
<94312179+masesdevelopers@users.noreply.github.com>
Date: Tue, 2 Jul 2024 19:22:43 +0200
Subject: [PATCH] Updates test execution to avoid failures (#258)
* Use `SingleOrDefault` instead of `Single` to manage conditions where it is raised `System.InvalidOperationException: Sequence contains no elements`
* Split windows jobs since Kafka seems to crash on that OS if heavily loaded
* In case of timeout, especially in Windows, exit without report error because the error comes from external source
* Configuration in workflow folder and remove prefetch
* Upload dumps
* Added KafkaStreams test
* Update management of exceptions
* Added Raw and Buffered tests for KafkaStreams
* Use latest JDK for tests
* Code harmonization
* Adds tests with prefetch
* Fix #22 aligning key of headers to the JVM type
* Update tests moving out execution from Main to verify if some tests fails for the same reason reported in https://github.com/masesgroup/KNet/pull/509
---
.github/workflows/build.yaml | 206 +++++++++++++++---
.../Benchmark.KNetReplicator.json | 6 +
...nchmark.KNetStreams.Buffered.Prefetch.json | 6 +
.../Benchmark.KNetStreams.Buffered.json | 7 +
.../Benchmark.KNetStreams.Raw.Prefetch.json | 7 +
.../Benchmark.KNetStreams.Raw.json | 8 +
.../Benchmark.KafkaStreams.Buffered.json | 7 +
.../Benchmark.KafkaStreams.Raw.json | 8 +
.../{ => configuration}/log4j.properties | 0
.../{ => configuration}/server.properties | 0
.../{ => configuration}/zookeeper.properties | 0
.../KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs | 32 +--
.../ProtobufKEFCoreSerDes.cs | 16 +-
.../KEFCore.SerDes/DefaultKEFCoreSerDes.cs | 16 +-
.../KEFCore.SerDes/KEFCoreSerDes.Helpers.cs | 8 +-
.../Storage/Internal/KNetStreamsRetriever.cs | 6 +-
.../Internal/KafkaStreamsBaseRetriever.cs | 2 +-
test/Common/ProgramConfig.cs | 48 +++-
.../Benchmark.KNetStreams.Raw.json | 1 +
...n => Benchmark.KafkaStreams.Buffered.json} | 0
.../Benchmark.KafkaStreams.Raw.json | 8 +
.../KEFCore.Benchmark.Test.csproj | 9 +-
test/KEFCore.Benchmark.Test/Program.cs | 39 ++--
test/KEFCore.Complex.Test/Program.cs | 16 +-
test/KEFCore.StreamTest/Program.cs | 18 +-
test/KEFCore.Test/Program.cs | 18 +-
26 files changed, 361 insertions(+), 131 deletions(-)
create mode 100644 .github/workflows/configuration/Benchmark.KNetReplicator.json
create mode 100644 .github/workflows/configuration/Benchmark.KNetStreams.Buffered.Prefetch.json
create mode 100644 .github/workflows/configuration/Benchmark.KNetStreams.Buffered.json
create mode 100644 .github/workflows/configuration/Benchmark.KNetStreams.Raw.Prefetch.json
create mode 100644 .github/workflows/configuration/Benchmark.KNetStreams.Raw.json
create mode 100644 .github/workflows/configuration/Benchmark.KafkaStreams.Buffered.json
create mode 100644 .github/workflows/configuration/Benchmark.KafkaStreams.Raw.json
rename .github/workflows/{ => configuration}/log4j.properties (100%)
rename .github/workflows/{ => configuration}/server.properties (100%)
rename .github/workflows/{ => configuration}/zookeeper.properties (100%)
rename test/KEFCore.Benchmark.Test/{Benchmark.KafkaStreams.json => Benchmark.KafkaStreams.Buffered.json} (100%)
create mode 100644 test/KEFCore.Benchmark.Test/Benchmark.KafkaStreams.Raw.json
diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 44385c2b..ea7a4355 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -82,12 +82,8 @@ jobs:
- name: Prepare configuration files
run: |
- Copy-Item .github\workflows\zookeeper.properties -Destination bin\net6.0\zookeeper.properties -Force
- Copy-Item .github\workflows\server.properties -Destination bin\net6.0\server.properties -Force
- Copy-Item .github\workflows\log4j.properties -Destination bin\net6.0\log4j.properties -Force
- Copy-Item .github\workflows\zookeeper.properties -Destination bin\net8.0\zookeeper.properties -Force
- Copy-Item .github\workflows\server.properties -Destination bin\net8.0\server.properties -Force
- Copy-Item .github\workflows\log4j.properties -Destination bin\net8.0\log4j.properties -Force
+ Copy-Item .github\workflows\configuration\* -Destination bin\net6.0\ -Force
+ Copy-Item .github\workflows\configuration\* -Destination bin\net8.0\ -Force
- name: Save KEFCore net6.0 bin in cache
uses: actions/cache/save@v4
@@ -116,7 +112,7 @@ jobs:
- 9092:9092
env:
KNET_DOCKER_RUNNING_MODE: server
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
strategy:
fail-fast: false
@@ -143,22 +139,50 @@ jobs:
with: # running setup-java again overwrites the settings.xml
distribution: ${{ matrix.jdk_vendor }}
java-version: ${{ matrix.jdk_version }}
+ check-latest: true
- - name: Execute KNetReplicator Benchmark on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
- run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetReplicator.json localhost:9092
+ - name: Execute Benchmark.KNetReplicator on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetReplicator.json
env:
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
-
- - name: Execute KNetStreams Raw Benchmark on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
- run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Raw.json localhost:9092
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KafkaStreams.Raw on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KafkaStreams.Raw.json
env:
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
-
- - name: Execute KNetStreams Buffered Benchmark on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
- run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Buffered.json localhost:9092
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KafkaStreams.Buffered on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KafkaStreams.Buffered.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KNetStreams.Raw on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Raw.json
env:
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+ - name: Execute Benchmark.KNetStreams.Raw.Prefetch on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Raw.Prefetch.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KNetStreams.Buffered on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Buffered.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KNetStreams.Buffered.Prefetch on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Buffered.Prefetch.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - uses: actions/upload-artifact@v4
+ if: failure()
+ with:
+ name: KEFCore_Client_Crash_ubuntu-latest_${{ matrix.framework }}_${{ matrix.jdk_vendor }}_${{ matrix.jdk_version }}
+ path: ${{ github.workspace }}/**/hs_err_*
+ retention-days: 7
+
execute_tests_other:
needs: build_windows
strategy:
@@ -191,6 +215,7 @@ jobs:
with: # running setup-java again overwrites the settings.xml
distribution: ${{ matrix.jdk_vendor }}
java-version: ${{ matrix.jdk_version }}
+ check-latest: true
- name: Authenticate to GitHub
run: dotnet nuget add source --username ${{ github.actor }} --password ${{ secrets.GITHUB_TOKEN }} --store-password-in-clear-text --name github "https://nuget.pkg.github.com/masesgroup/index.json"
@@ -206,38 +231,144 @@ jobs:
Start-Process -RSE ${{ github.workspace }}/logfiles/PWSH_zookeeper_err.log -RSO ${{ github.workspace }}/logfiles/PWSH_zookeeper_out.log -FilePath knet -ArgumentList ( 'zookeeperstart', '-LogPath', '${{ github.workspace }}/logfiles/', '-Log4JConfiguration', '${{ github.workspace }}/bin/${{ matrix.framework }}/log4j.properties', '${{ github.workspace }}/bin/${{ matrix.framework }}/zookeeper.properties' )
Start-Process -RSE ${{ github.workspace }}/logfiles/PWSH_kafka_err.log -RSO ${{ github.workspace }}/logfiles/PWSH_kafka_out.log -FilePath knet -ArgumentList ( 'kafkastart', '-LogPath', '${{ github.workspace }}/logfiles/', '-Log4JConfiguration', '${{ github.workspace }}/bin/${{ matrix.framework }}/log4j.properties', '${{ github.workspace }}/bin/${{ matrix.framework }}/server.properties' )
env:
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
- - name: Execute KNetReplicator Benchmark on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ - name: Execute Benchmark.KNetReplicator on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os != 'windows-latest' }}
- run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetReplicator.json localhost:9092
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetReplicator.json
env:
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
- - name: Execute KNetStreams Raw Benchmark on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ - name: Execute Benchmark.KafkaStreams.Raw on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os != 'windows-latest' }}
- run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Raw.json localhost:9092
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KafkaStreams.Raw.json
env:
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
-
- - name: Execute KNetStreams Buffered Benchmark on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KafkaStreams.Buffered on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os != 'windows-latest' }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KafkaStreams.Buffered.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KNetStreams.Raw on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os != 'windows-latest' }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Raw.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KNetStreams.Raw.Prefetch on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os != 'windows-latest' }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Raw.Prefetch.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KNetStreams.Buffered on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os != 'windows-latest' }}
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Buffered.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: Execute Benchmark.KNetStreams.Buffered.Prefetch on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os != 'windows-latest' }}
- run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Buffered.json localhost:9092
+ run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.Buffered.Prefetch.json
env:
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
- - name: WINDOWS ONLY - Start Kafka and execute tests on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ - name: WINDOWS ONLY - Start Kafka and execute Benchmark.KNetReplicator on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os == 'windows-latest' }}
shell: pwsh
run: |
New-Item -Path "${{ github.workspace }}/" -Name "logfiles" -ItemType Directory
Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_zookeeper_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_zookeeper_out.log -FilePath knet -ArgumentList ( 'zookeeperstart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\zookeeper.properties' )
Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_kafka_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_kafka_out.log -FilePath knet -ArgumentList ( 'kafkastart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\server.properties' )
- dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KNetReplicator.json localhost:9092
- dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KNetStreams.Raw.json localhost:9092
- dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KNetStreams.Buffered.json localhost:9092
+ dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KNetReplicator.json
env:
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: WINDOWS ONLY - Start Kafka and execute Benchmark.KafkaStreams.Raw on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os == 'windows-latest' }}
+ shell: pwsh
+ run: |
+ Start-Sleep -Seconds 20
+ Remove-Item ${{ github.workspace }}\logfiles\* -Recurse -Force
+ Remove-Item D:\tmp\zookeeper\* -Recurse -Force
+ Remove-Item D:\tmp\kafka-logs\* -Recurse -Force
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_zookeeper_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_zookeeper_out.log -FilePath knet -ArgumentList ( 'zookeeperstart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\zookeeper.properties' )
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_kafka_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_kafka_out.log -FilePath knet -ArgumentList ( 'kafkastart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\server.properties' )
+ dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KafkaStreams.Raw.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: WINDOWS ONLY - Start Kafka and execute Benchmark.KafkaStreams.Buffered on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os == 'windows-latest' }}
+ shell: pwsh
+ run: |
+ Start-Sleep -Seconds 20
+ Remove-Item ${{ github.workspace }}\logfiles\* -Recurse -Force
+ Remove-Item D:\tmp\zookeeper\* -Recurse -Force
+ Remove-Item D:\tmp\kafka-logs\* -Recurse -Force
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_zookeeper_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_zookeeper_out.log -FilePath knet -ArgumentList ( 'zookeeperstart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\zookeeper.properties' )
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_kafka_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_kafka_out.log -FilePath knet -ArgumentList ( 'kafkastart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\server.properties' )
+ dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KafkaStreams.Buffered.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: WINDOWS ONLY - Start Kafka and execute Benchmark.KNetStreams.Raw on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os == 'windows-latest' }}
+ shell: pwsh
+ run: |
+ Start-Sleep -Seconds 20
+ Remove-Item ${{ github.workspace }}\logfiles\* -Recurse -Force
+ Remove-Item D:\tmp\zookeeper\* -Recurse -Force
+ Remove-Item D:\tmp\kafka-logs\* -Recurse -Force
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_zookeeper_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_zookeeper_out.log -FilePath knet -ArgumentList ( 'zookeeperstart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\zookeeper.properties' )
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_kafka_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_kafka_out.log -FilePath knet -ArgumentList ( 'kafkastart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\server.properties' )
+ dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KNetStreams.Raw.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: WINDOWS ONLY - Start Kafka and execute Benchmark.KNetStreams.Raw.Prefetch on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os == 'windows-latest' }}
+ shell: pwsh
+ run: |
+ Start-Sleep -Seconds 20
+ Remove-Item ${{ github.workspace }}\logfiles\* -Recurse -Force
+ Remove-Item D:\tmp\zookeeper\* -Recurse -Force
+ Remove-Item D:\tmp\kafka-logs\* -Recurse -Force
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_zookeeper_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_zookeeper_out.log -FilePath knet -ArgumentList ( 'zookeeperstart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\zookeeper.properties' )
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_kafka_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_kafka_out.log -FilePath knet -ArgumentList ( 'kafkastart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\server.properties' )
+ dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KNetStreams.Raw.Prefetch.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: WINDOWS ONLY - Start Kafka and execute Benchmark.KNetStreams.Buffered on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os == 'windows-latest' }}
+ shell: pwsh
+ run: |
+ Start-Sleep -Seconds 20
+ Remove-Item ${{ github.workspace }}\logfiles\* -Recurse -Force
+ Remove-Item D:\tmp\zookeeper\* -Recurse -Force
+ Remove-Item D:\tmp\kafka-logs\* -Recurse -Force
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_zookeeper_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_zookeeper_out.log -FilePath knet -ArgumentList ( 'zookeeperstart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\zookeeper.properties' )
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_kafka_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_kafka_out.log -FilePath knet -ArgumentList ( 'kafkastart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\server.properties' )
+ dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KNetStreams.Buffered.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
+
+ - name: WINDOWS ONLY - Start Kafka and execute Benchmark.KNetStreams.Buffered.Prefetch on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os == 'windows-latest' }}
+ shell: pwsh
+ run: |
+ Start-Sleep -Seconds 20
+ Remove-Item ${{ github.workspace }}\logfiles\* -Recurse -Force
+ Remove-Item D:\tmp\zookeeper\* -Recurse -Force
+ Remove-Item D:\tmp\kafka-logs\* -Recurse -Force
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_zookeeper_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_zookeeper_out.log -FilePath knet -ArgumentList ( 'zookeeperstart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\zookeeper.properties' )
+ Start-Process -RSE ${{ github.workspace }}\logfiles\PWSH_kafka_err.log -RSO ${{ github.workspace }}\logfiles\PWSH_kafka_out.log -FilePath knet -ArgumentList ( 'kafkastart', '-LogPath', '${{ github.workspace }}\logfiles\', '-Log4JConfiguration', '${{ github.workspace }}\bin\${{ matrix.framework }}\log4j.properties', '${{ github.workspace }}\bin\${{ matrix.framework }}\server.properties' )
+ dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KNetStreams.Buffered.Prefetch.json
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}
- uses: actions/upload-artifact@v4
if: ${{ failure() || cancelled() }}
@@ -246,6 +377,13 @@ jobs:
path: ${{ github.workspace }}/logfiles/
retention-days: 7
+ - uses: actions/upload-artifact@v4
+ if: failure()
+ with:
+ name: KEFCore_Client_Crash_${{ matrix.os }}_${{ matrix.framework }}_${{ matrix.jdk_vendor }}_${{ matrix.jdk_version }}
+ path: ${{ github.workspace }}/**/hs_err_*
+ retention-days: 7
+
final_cleanup:
needs: [ execute_tests_linux, execute_tests_other ]
if: "always()"
diff --git a/.github/workflows/configuration/Benchmark.KNetReplicator.json b/.github/workflows/configuration/Benchmark.KNetReplicator.json
new file mode 100644
index 00000000..291f4a24
--- /dev/null
+++ b/.github/workflows/configuration/Benchmark.KNetReplicator.json
@@ -0,0 +1,6 @@
+{
+ "DatabaseName": "TestDBBenchmark",
+ "BootstrapServers": "localhost:9092",
+ "NumberOfExecutions": 10,
+ "UseEnumeratorWithPrefetch": false
+}
diff --git a/.github/workflows/configuration/Benchmark.KNetStreams.Buffered.Prefetch.json b/.github/workflows/configuration/Benchmark.KNetStreams.Buffered.Prefetch.json
new file mode 100644
index 00000000..0a9a0573
--- /dev/null
+++ b/.github/workflows/configuration/Benchmark.KNetStreams.Buffered.Prefetch.json
@@ -0,0 +1,6 @@
+{
+ "DatabaseName": "TestDBBenchmark",
+ "UseCompactedReplicator": false,
+ "BootstrapServers": "localhost:9092",
+ "NumberOfExecutions": 10
+}
diff --git a/.github/workflows/configuration/Benchmark.KNetStreams.Buffered.json b/.github/workflows/configuration/Benchmark.KNetStreams.Buffered.json
new file mode 100644
index 00000000..a66af0bb
--- /dev/null
+++ b/.github/workflows/configuration/Benchmark.KNetStreams.Buffered.json
@@ -0,0 +1,7 @@
+{
+ "DatabaseName": "TestDBBenchmark",
+ "UseCompactedReplicator": false,
+ "BootstrapServers": "localhost:9092",
+ "NumberOfExecutions": 10,
+ "UseEnumeratorWithPrefetch": false
+}
diff --git a/.github/workflows/configuration/Benchmark.KNetStreams.Raw.Prefetch.json b/.github/workflows/configuration/Benchmark.KNetStreams.Raw.Prefetch.json
new file mode 100644
index 00000000..3738fce1
--- /dev/null
+++ b/.github/workflows/configuration/Benchmark.KNetStreams.Raw.Prefetch.json
@@ -0,0 +1,7 @@
+{
+ "DatabaseName": "TestDBBenchmark",
+ "UseCompactedReplicator": false,
+ "UseByteBufferDataTransfer": false,
+ "BootstrapServers": "localhost:9092",
+ "NumberOfExecutions": 10
+}
diff --git a/.github/workflows/configuration/Benchmark.KNetStreams.Raw.json b/.github/workflows/configuration/Benchmark.KNetStreams.Raw.json
new file mode 100644
index 00000000..a8020c37
--- /dev/null
+++ b/.github/workflows/configuration/Benchmark.KNetStreams.Raw.json
@@ -0,0 +1,8 @@
+{
+ "DatabaseName": "TestDBBenchmark",
+ "UseCompactedReplicator": false,
+ "UseByteBufferDataTransfer": false,
+ "BootstrapServers": "localhost:9092",
+ "NumberOfExecutions": 10,
+ "UseEnumeratorWithPrefetch": false
+}
diff --git a/.github/workflows/configuration/Benchmark.KafkaStreams.Buffered.json b/.github/workflows/configuration/Benchmark.KafkaStreams.Buffered.json
new file mode 100644
index 00000000..3e5ad6e9
--- /dev/null
+++ b/.github/workflows/configuration/Benchmark.KafkaStreams.Buffered.json
@@ -0,0 +1,7 @@
+{
+ "DatabaseName": "TestDBBenchmark",
+ "UseCompactedReplicator": false,
+ "UseKNetStreams": false,
+ "BootstrapServers": "localhost:9092",
+ "NumberOfExecutions": 10
+}
diff --git a/.github/workflows/configuration/Benchmark.KafkaStreams.Raw.json b/.github/workflows/configuration/Benchmark.KafkaStreams.Raw.json
new file mode 100644
index 00000000..44237cb9
--- /dev/null
+++ b/.github/workflows/configuration/Benchmark.KafkaStreams.Raw.json
@@ -0,0 +1,8 @@
+{
+ "DatabaseName": "TestDBBenchmark",
+ "UseCompactedReplicator": false,
+ "UseKNetStreams": false,
+ "UseByteBufferDataTransfer": false,
+ "BootstrapServers": "localhost:9092",
+ "NumberOfExecutions": 10
+}
diff --git a/.github/workflows/log4j.properties b/.github/workflows/configuration/log4j.properties
similarity index 100%
rename from .github/workflows/log4j.properties
rename to .github/workflows/configuration/log4j.properties
diff --git a/.github/workflows/server.properties b/.github/workflows/configuration/server.properties
similarity index 100%
rename from .github/workflows/server.properties
rename to .github/workflows/configuration/server.properties
diff --git a/.github/workflows/zookeeper.properties b/.github/workflows/configuration/zookeeper.properties
similarity index 100%
rename from .github/workflows/zookeeper.properties
rename to .github/workflows/configuration/zookeeper.properties
diff --git a/src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs b/src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs
index 31e2626a..23e4a6d1 100644
--- a/src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs
+++ b/src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs
@@ -127,8 +127,8 @@ public override byte[] Serialize(string topic, TData data)
///
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
- headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
+ headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
+ headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
@@ -199,8 +199,8 @@ public override ByteBuffer Serialize(string topic, TData data)
///
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
- headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
+ headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
+ headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
@@ -310,8 +310,8 @@ public override byte[] Serialize(string topic, TData data)
///
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
- headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
+ headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
+ headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
@@ -377,8 +377,8 @@ public override ByteBuffer Serialize(string topic, TData data)
///
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
- headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
+ headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
+ headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
@@ -494,8 +494,8 @@ public override byte[] Serialize(string topic, TData data)
///
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
- headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
+ headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
+ headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);
using MemoryStream memStream = new();
BinaryEncoder encoder = new(memStream);
@@ -562,8 +562,8 @@ public override ByteBuffer Serialize(string topic, TData data)
///
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
- headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
+ headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
+ headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);
MemoryStream memStream = new();
BinaryEncoder encoder = new(memStream);
@@ -668,8 +668,8 @@ public override byte[] Serialize(string topic, TData data)
///
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
- headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
+ headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
+ headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);
using MemoryStream memStream = new();
JsonEncoder encoder = new(AvroValueContainer._SCHEMA, memStream);
@@ -737,8 +737,8 @@ public override ByteBuffer Serialize(string topic, TData data)
///
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
- headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
+ headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
+ headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);
MemoryStream memStream = new();
JsonEncoder encoder = new(AvroValueContainer._SCHEMA, memStream);
diff --git a/src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs b/src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs
index 93cdc47e..cde16176 100644
--- a/src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs
+++ b/src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs
@@ -118,8 +118,8 @@ public override byte[] Serialize(string topic, TData data)
///
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
- headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
+ headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
+ headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
KeyContainer keyContainer = null!;
@@ -185,8 +185,8 @@ public override ByteBuffer Serialize(string topic, TData data)
///
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
- headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
+ headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
+ headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
KeyContainer keyContainer = null!;
@@ -298,8 +298,8 @@ public override byte[] Serialize(string topic, TData data)
///
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
- headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
+ headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
+ headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);
using (MemoryStream stream = new())
{
@@ -361,8 +361,8 @@ public override ByteBuffer Serialize(string topic, TData data)
///
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
- headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
+ headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
+ headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);
MemoryStream stream = new();
data.WriteTo(stream);
diff --git a/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs b/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
index 7668cda6..a6375a8b 100644
--- a/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
+++ b/src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
@@ -126,8 +126,8 @@ public override byte[] Serialize(string topic, TData data)
///
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
- headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
+ headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
+ headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
var jsonStr = System.Text.Json.JsonSerializer.Serialize(data);
@@ -191,8 +191,8 @@ public override ByteBuffer Serialize(string topic, TData data)
///
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
- headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
+ headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
+ headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);
if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
@@ -301,8 +301,8 @@ public override byte[] Serialize(string topic, TData data)
///
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
- headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
+ headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
+ headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);
var jsonStr = System.Text.Json.JsonSerializer.Serialize(data, _options);
return Encoding.UTF8.GetBytes(jsonStr);
@@ -365,8 +365,8 @@ public override ByteBuffer Serialize(string topic, TData data)
///
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
- headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
- headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
+ headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
+ headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);
var ms = new MemoryStream();
System.Text.Json.JsonSerializer.Serialize(ms, data, _options);
diff --git a/src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs b/src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs
index ac45251f..d7ce23fb 100644
--- a/src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs
+++ b/src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs
@@ -151,22 +151,22 @@ public static object FromRecord(Org.Apache.Kafka.Clients.Consumer.ConsumerRecord
foreach (var header in headers.ToArray())
{
var key = header.Key();
- if (key == KNetSerialization.KeyTypeIdentifier)
+ if (key == KNetSerialization.KeyTypeIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
keyType = Type.GetType(strType, true)!;
}
- if (key == KNetSerialization.KeySerializerIdentifier)
+ if (key == KNetSerialization.KeySerializerIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
keySerializerSelectorType = Type.GetType(strType, true)!;
}
- if (key == KNetSerialization.ValueTypeIdentifier)
+ if (key == KNetSerialization.ValueTypeIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
valueType = Type.GetType(strType, true)!;
}
- if (key == KNetSerialization.ValueSerializerIdentifier)
+ if (key == KNetSerialization.ValueSerializerIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
valueSerializerSelectorType = Type.GetType(strType, true)!;
diff --git a/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs b/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs
index 221e0d74..59624f6e 100644
--- a/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs
+++ b/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs
@@ -444,7 +444,7 @@ public ValueTask MoveNextAsync()
{
_moveNextSw.Start();
#endif
- ValueTask hasNext = _asyncEnumerator.MoveNextAsync();
+ ValueTask hasNext = _asyncEnumerator == null ? new ValueTask(false) : _asyncEnumerator.MoveNextAsync();
hasNext.AsTask().Wait();
if (hasNext.Result)
{
@@ -452,12 +452,12 @@ public ValueTask MoveNextAsync()
_cycles++;
_valueGetSw.Start();
#endif
- KeyValue kv = _asyncEnumerator.Current;
+ KeyValue? kv = _asyncEnumerator?.Current;
#if DEBUG_PERFORMANCE
_valueGetSw.Stop();
_valueGet2Sw.Start();
#endif
- TValue value = kv.Value;
+ TValue value = kv != null ? kv.Value : default;
#if DEBUG_PERFORMANCE
_valueGet2Sw.Stop();
_valueBufferSw.Start();
diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
index 4d811c35..4a46dba3 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
@@ -90,7 +90,7 @@ public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityT
_builder ??= builder;
_topicName = _entityType.TopicName(kafkaCluster.Options);
_usePersistentStorage = _kafkaCluster.Options.UsePersistentStorage;
- _properties ??= _kafkaCluster.Options.StreamsOptions(_kafkaCluster.Options.ApplicationId);
+ _properties ??= _kafkaCluster.Options.StreamsOptions(_entityType);
string storageId = _entityType.StorageIdForTable(_kafkaCluster.Options);
_storageId = _usePersistentStorage ? storageId : Process.GetCurrentProcess().ProcessName + "-" + storageId;
diff --git a/test/Common/ProgramConfig.cs b/test/Common/ProgramConfig.cs
index 06ced76f..6492e8c5 100644
--- a/test/Common/ProgramConfig.cs
+++ b/test/Common/ProgramConfig.cs
@@ -25,7 +25,6 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage;
-using MASES.EntityFrameworkCore.KNet.Serialization.Json;
using MASES.EntityFrameworkCore.KNet.Serialization.Protobuf;
using MASES.EntityFrameworkCore.KNet.Serialization.Protobuf.Storage;
using MASES.KNet.Streams;
@@ -33,6 +32,8 @@
using System;
using System.IO;
using System.Text.Json;
+using Java.Lang;
+using Java.Util.Concurrent;
namespace MASES.EntityFrameworkCore.KNet.Test.Common
{
@@ -118,18 +119,57 @@ public static void LoadConfig(string[] args)
ReportString(JsonSerializer.Serialize(Config, new JsonSerializerOptions() { WriteIndented = true }));
if (!KafkaDbContext.EnableKEFCoreTracing) KafkaDbContext.EnableKEFCoreTracing = Config.EnableKEFCoreTracing;
+
+ if (!Config.UseInMemoryProvider)
+ {
+ KEFCore.CreateGlobalInstance();
+ KEFCore.PreserveInformationAcrossContexts = Config.PreserveInformationAcrossContexts;
+ }
}
- public static void ReportString(string message)
+ public static void ReportString(string message, bool noDataReturned = false)
{
+ var msg = $"{DateTime.Now:HH::mm::ss:ffff} - {(noDataReturned ? "No data returned for " : " ")}{message}";
+
if (Debugger.IsAttached)
{
- Trace.WriteLine($"{DateTime.Now:HH::mm::ss:ffff} - {message}");
+ if (noDataReturned) Trace.TraceError(msg);
+ else Trace.WriteLine(msg);
+ }
+ else
+ {
+ if (noDataReturned) Console.Error.WriteLine(msg);
+ else Console.WriteLine(msg);
+ }
+ }
+
+ public static int ManageException(System.Exception e)
+ {
+ int retCode = 0;
+ if (e is ExecutionException ee)
+ {
+ return ManageException(ee.InnerException);
+ }
+ else if (e is ClassNotFoundException cnfe)
+ {
+ ReportString($"Failed with {cnfe}, current ClassPath is {KEFCore.GlobalInstance.ClassPath}");
+ retCode = 1;
+ }
+ else if (e is NoClassDefFoundError ncdfe)
+ {
+ ReportString($"Failed with {ncdfe}, current ClassPath is {KEFCore.GlobalInstance.ClassPath}");
+ retCode = 1;
+ }
+ else if (e is Org.Apache.Kafka.Common.Errors.TimeoutException toe)
+ {
+ ReportString(toe.ToString(), true);
}
else
{
- Console.WriteLine($"{DateTime.Now:HH::mm::ss:ffff} - {message}");
+ ReportString($"Failed with {e}");
+ retCode = 1;
}
+ return retCode;
}
}
}
diff --git a/test/KEFCore.Benchmark.Test/Benchmark.KNetStreams.Raw.json b/test/KEFCore.Benchmark.Test/Benchmark.KNetStreams.Raw.json
index 4613d2f9..93a55ef7 100644
--- a/test/KEFCore.Benchmark.Test/Benchmark.KNetStreams.Raw.json
+++ b/test/KEFCore.Benchmark.Test/Benchmark.KNetStreams.Raw.json
@@ -2,6 +2,7 @@
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"UseByteBufferDataTransfer": false,
+ "UseEnumeratorWithPrefetch": false,
"BootstrapServers": "192.168.0.101:9092",
"NumberOfExecutions": 10
}
diff --git a/test/KEFCore.Benchmark.Test/Benchmark.KafkaStreams.json b/test/KEFCore.Benchmark.Test/Benchmark.KafkaStreams.Buffered.json
similarity index 100%
rename from test/KEFCore.Benchmark.Test/Benchmark.KafkaStreams.json
rename to test/KEFCore.Benchmark.Test/Benchmark.KafkaStreams.Buffered.json
diff --git a/test/KEFCore.Benchmark.Test/Benchmark.KafkaStreams.Raw.json b/test/KEFCore.Benchmark.Test/Benchmark.KafkaStreams.Raw.json
new file mode 100644
index 00000000..7c07be30
--- /dev/null
+++ b/test/KEFCore.Benchmark.Test/Benchmark.KafkaStreams.Raw.json
@@ -0,0 +1,8 @@
+{
+ "DatabaseName": "TestDBBenchmark",
+ "UseCompactedReplicator": false,
+ "UseKNetStreams": false,
+ "UseByteBufferDataTransfer": false,
+ "BootstrapServers": "192.168.0.101:9092",
+ "NumberOfExecutions": 10
+}
diff --git a/test/KEFCore.Benchmark.Test/KEFCore.Benchmark.Test.csproj b/test/KEFCore.Benchmark.Test/KEFCore.Benchmark.Test.csproj
index b224d7ea..599923a4 100644
--- a/test/KEFCore.Benchmark.Test/KEFCore.Benchmark.Test.csproj
+++ b/test/KEFCore.Benchmark.Test/KEFCore.Benchmark.Test.csproj
@@ -14,6 +14,12 @@
+
+ PreserveNewest
+
+
+ PreserveNewest
+
PreserveNewest
@@ -38,9 +44,6 @@
PreserveNewest
-
- PreserveNewest
-
PreserveNewest
diff --git a/test/KEFCore.Benchmark.Test/Program.cs b/test/KEFCore.Benchmark.Test/Program.cs
index 8c62a16d..9e2cb5cc 100644
--- a/test/KEFCore.Benchmark.Test/Program.cs
+++ b/test/KEFCore.Benchmark.Test/Program.cs
@@ -22,6 +22,7 @@
* SOFTWARE.
*/
+using Java.Util.Concurrent;
using MASES.EntityFrameworkCore.KNet.Infrastructure;
using MASES.EntityFrameworkCore.KNet.Test.Common;
using MASES.EntityFrameworkCore.KNet.Test.Model;
@@ -47,6 +48,12 @@ public ExecutionData(int executionIndex, int maxTests)
}
static void Main(string[] args)
+ {
+ ProgramConfig.LoadConfig(args);
+ ExecuteTests();
+ }
+
+ static void ExecuteTests()
{
const int maxTests = 10;
Dictionary _tests = new();
@@ -54,14 +61,6 @@ static void Main(string[] args)
var testWatcher = new Stopwatch();
var globalWatcher = new Stopwatch();
- ProgramConfig.LoadConfig(args);
-
- if (!ProgramConfig.Config.UseInMemoryProvider)
- {
- KEFCore.CreateGlobalInstance();
- KEFCore.PreserveInformationAcrossContexts = ProgramConfig.Config.PreserveInformationAcrossContexts;
- }
-
try
{
globalWatcher.Start();
@@ -123,22 +122,22 @@ static void Main(string[] args)
Stopwatch watch = new();
watch.Restart();
- var post = context.Posts.Single(b => b.BlogId == 2);
+ var post = context.Posts.SingleOrDefault(b => b.BlogId == 2);
watch.Stop();
_tests[execution].QueryTimes[0] = watch.Elapsed;
- ProgramConfig.ReportString($"First execution of context.Posts.Single(b => b.BlogId == 2) takes {watch.Elapsed}. Result is {post}");
+ ProgramConfig.ReportString($"First execution of context.Posts.Single(b => b.BlogId == 2) takes {watch.Elapsed}. Result is {post}", post == default);
watch.Restart();
- post = context.Posts.Single(b => b.BlogId == 2);
+ post = context.Posts.SingleOrDefault(b => b.BlogId == 2);
watch.Stop();
_tests[execution].QueryTimes[1] = watch.Elapsed;
- ProgramConfig.ReportString($"Second execution of context.Posts.Single(b => b.BlogId == 2) takes {watch.Elapsed}. Result is {post}");
+ ProgramConfig.ReportString($"Second execution of context.Posts.Single(b => b.BlogId == 2) takes {watch.Elapsed}. Result is {post}", post == default);
watch.Restart();
- post = context.Posts.Single(b => b.BlogId == ProgramConfig.Config.NumberOfElements - 1);
+ post = context.Posts.SingleOrDefault(b => b.BlogId == ProgramConfig.Config.NumberOfElements - 1);
watch.Stop();
_tests[execution].QueryTimes[2] = watch.Elapsed;
- ProgramConfig.ReportString($"Execution of context.Posts.Single(b => b.BlogId == {ProgramConfig.Config.NumberOfElements - 1}) takes {watch.Elapsed}. Result is {post}");
+ ProgramConfig.ReportString($"Execution of context.Posts.Single(b => b.BlogId == {ProgramConfig.Config.NumberOfElements - 1}) takes {watch.Elapsed}. Result is {post}", post == default);
watch.Restart();
var all = context.Posts.All((o) => true);
@@ -148,17 +147,17 @@ static void Main(string[] args)
Blog blog = null;
watch.Restart();
- blog = context.Blogs.Single(b => b.BlogId == 1);
+ blog = context.Blogs.SingleOrDefault(b => b.BlogId == 1);
watch.Stop();
_tests[execution].QueryTimes[4] = watch.Elapsed;
- ProgramConfig.ReportString($"First execution of context.Blogs.Single(b => b.BlogId == 1) takes {watch.Elapsed}. Result is {blog}");
+ ProgramConfig.ReportString($"First execution of context.Blogs.Single(b => b.BlogId == 1) takes {watch.Elapsed}. Result is {blog}", blog == default);
watch.Restart();
- blog = context.Blogs.Single(b => b.BlogId == 1);
+ blog = context.Blogs.SingleOrDefault(b => b.BlogId == 1);
watch.Stop();
_tests[execution].QueryTimes[5] = watch.Elapsed;
- ProgramConfig.ReportString($"Second execution of context.Blogs.Single(b => b.BlogId == 1) takes {watch.Elapsed}. Result is {blog}");
+ ProgramConfig.ReportString($"Second execution of context.Blogs.Single(b => b.BlogId == 1) takes {watch.Elapsed}. Result is {blog}", blog == default);
watch.Restart();
var selector = (from op in context.Blogs
@@ -187,8 +186,7 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
}
catch (Exception ex)
{
- ProgramConfig.ReportString(ex.ToString());
- Environment.ExitCode = 1;
+ Environment.ExitCode = ProgramConfig.ManageException(ex);
}
finally
{
@@ -228,6 +226,7 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
}
}
+
public class BloggingContext : KafkaDbContext
{
public DbSet Blogs { get; set; }
diff --git a/test/KEFCore.Complex.Test/Program.cs b/test/KEFCore.Complex.Test/Program.cs
index 3e489379..01022d63 100644
--- a/test/KEFCore.Complex.Test/Program.cs
+++ b/test/KEFCore.Complex.Test/Program.cs
@@ -36,18 +36,17 @@ namespace MASES.EntityFrameworkCore.KNet.Complex.Test
partial class Program
{
static void Main(string[] args)
+ {
+ ProgramConfig.LoadConfig(args);
+ ExecuteTests();
+ }
+
+ static void ExecuteTests()
{
BloggingContext context = null;
var testWatcher = new Stopwatch();
var globalWatcher = new Stopwatch();
- ProgramConfig.LoadConfig(args);
-
- if (!ProgramConfig.Config.UseInMemoryProvider)
- {
- KEFCore.CreateGlobalInstance();
- }
-
try
{
globalWatcher.Start();
@@ -183,8 +182,7 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
}
catch (Exception ex)
{
- ProgramConfig.ReportString(ex.ToString());
- Environment.ExitCode = 1;
+ Environment.ExitCode = ProgramConfig.ManageException(ex);
}
finally
{
diff --git a/test/KEFCore.StreamTest/Program.cs b/test/KEFCore.StreamTest/Program.cs
index 1687e689..a280dd19 100644
--- a/test/KEFCore.StreamTest/Program.cs
+++ b/test/KEFCore.StreamTest/Program.cs
@@ -37,16 +37,14 @@ partial class Program
{
static void Main(string[] args)
{
- var testWatcher = new Stopwatch();
- var globalWatcher = new Stopwatch();
-
ProgramConfig.LoadConfig(args);
+ ExecuteTests();
+ }
- if (!ProgramConfig.Config.UseInMemoryProvider)
- {
- KEFCore.CreateGlobalInstance();
- }
-
+ static void ExecuteTests()
+ {
+ var testWatcher = new Stopwatch();
+ var globalWatcher = new Stopwatch();
try
{
globalWatcher.Start();
@@ -185,8 +183,7 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
}
catch (Exception ex)
{
- ProgramConfig.ReportString(ex.ToString());
- Environment.ExitCode = 1;
+ Environment.ExitCode = ProgramConfig.ManageException(ex);
}
finally
{
@@ -195,7 +192,6 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
Console.WriteLine($"Full test completed in {globalWatcher.Elapsed}, only tests completed in {testWatcher.Elapsed}");
}
}
-
}
public class BloggingContext : KafkaDbContext
diff --git a/test/KEFCore.Test/Program.cs b/test/KEFCore.Test/Program.cs
index e66bf632..e7965500 100644
--- a/test/KEFCore.Test/Program.cs
+++ b/test/KEFCore.Test/Program.cs
@@ -40,15 +40,14 @@ partial class Program
static void Main(string[] args)
{
- var testWatcher = new Stopwatch();
- var globalWatcher = new Stopwatch();
-
ProgramConfig.LoadConfig(args);
+ ExecuteTests();
+ }
- if (!ProgramConfig.Config.UseInMemoryProvider)
- {
- KEFCore.CreateGlobalInstance();
- }
+ static void ExecuteTests()
+ {
+ var testWatcher = new Stopwatch();
+ var globalWatcher = new Stopwatch();
try
{
@@ -59,7 +58,7 @@ static void Main(string[] args)
};
ProgramConfig.Config.ApplyOnContext(context);
-
+
if (ProgramConfig.Config.DeleteApplicationData)
{
context.Database.EnsureDeleted();
@@ -190,8 +189,7 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
}
catch (Exception ex)
{
- ProgramConfig.ReportString(ex.ToString());
- Environment.ExitCode = 1;
+ Environment.ExitCode = ProgramConfig.ManageException(ex);
}
finally
{