Skip to content

Commit

Permalink
Added Streams test to reproduce what happens in KEFCore (#514)
Browse files Browse the repository at this point in the history
* Added Streams test to reproduce what happens in KEFCore

* Added KeyValueSupport class to replace direct usage of fields of org.apache.kafka.streams.KeyValue with methods to retrieve key and value information

* Move ToIEnumerator into CommonIterator and update behavior of UsePrefetch management
  • Loading branch information
masesdevelopers authored Jul 2, 2024
1 parent c9963a2 commit 51d87d0
Show file tree
Hide file tree
Showing 22 changed files with 557 additions and 185 deletions.
28 changes: 14 additions & 14 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ jobs:
- 9092:9092
env:
KNET_DOCKER_RUNNING_MODE: server
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

strategy:
fail-fast: false
Expand Down Expand Up @@ -313,31 +313,31 @@ jobs:
if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' || matrix.os == 'macos-13' }}
run: dotnet ./bin/${{ matrix.framework }}/KNetTest.dll localhost:9092 useConsumeCallback randomizeTopicName
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTest withBigBigExtraValue on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' || matrix.os == 'macos-13' }}
run: dotnet ./bin/${{ matrix.framework }}/KNetTest.dll localhost:9092 useConsumeCallback randomizeTopicName withBigBigExtraValue
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTest runBuffered on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' || matrix.os == 'macos-13' }}
run: dotnet ./bin/${{ matrix.framework }}/KNetTest.dll localhost:9092 useConsumeCallback randomizeTopicName runBuffered
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTest runBuffered withBigBigExtraValue on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' || matrix.os == 'macos-13' }}
run: dotnet ./bin/${{ matrix.framework }}/KNetTest.dll localhost:9092 useConsumeCallback randomizeTopicName runBuffered withBigBigExtraValue
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTestSerDes on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
shell: pwsh
run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/KNetTestSerDes.dll
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

execute_tests_other:
needs: [build_windows]
Expand Down Expand Up @@ -392,35 +392,35 @@ jobs:
Start-Process -RSE ${{ github.workspace }}/logfiles/PWSH_zookeeper_err.log -RSO ${{ github.workspace }}/logfiles/PWSH_zookeeper_out.log -FilePath dotnet -ArgumentList ( '${{ github.workspace }}/binCLI/${{ matrix.framework }}/MASES.KNetCLI.dll', 'zookeeperstart', '-LogPath', '${{ github.workspace }}/logfiles/', '-Log4JConfiguration', '${{ github.workspace }}/bin/${{ matrix.framework }}/log4j.properties', '${{ github.workspace }}/bin/${{ matrix.framework }}/zookeeper.properties' )
Start-Process -RSE ${{ github.workspace }}/logfiles/PWSH_kafka_err.log -RSO ${{ github.workspace }}/logfiles/PWSH_kafka_out.log -FilePath dotnet -ArgumentList ( '${{ github.workspace }}/binCLI/${{ matrix.framework }}/MASES.KNetCLI.dll', 'kafkastart', '-LogPath', '${{ github.workspace }}/logfiles/', '-Log4JConfiguration', '${{ github.workspace }}/bin/${{ matrix.framework }}/log4j.properties', '${{ github.workspace }}/bin/${{ matrix.framework }}/server.properties' )
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTest on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os != 'windows-latest' }}
shell: pwsh
run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/KNetTest.dll localhost:9092 useConsumeCallback randomizeTopicName
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTest withBigBigExtraValue on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os != 'windows-latest' }}
shell: pwsh
run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/KNetTest.dll localhost:9092 useConsumeCallback randomizeTopicName withBigBigExtraValue
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTest runBuffered on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os != 'windows-latest' }}
shell: pwsh
run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/KNetTest.dll localhost:9092 useConsumeCallback randomizeTopicName runBuffered
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTest runBuffered withBigBigExtraValue on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os != 'windows-latest' }}
shell: pwsh
run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/KNetTest.dll localhost:9092 useConsumeCallback randomizeTopicName runBuffered withBigBigExtraValue
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: WINDOWS ONLY - Start Kafka and execute KNetTest on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os == 'windows-latest' }}
Expand All @@ -435,21 +435,21 @@ jobs:
${{ github.workspace }}\bin\${{ matrix.framework }}/KNetTest.exe localhost:9092 useConsumeCallback randomizeTopicName runBuffered
${{ github.workspace }}\bin\${{ matrix.framework }}/KNetTest.exe localhost:9092 useConsumeCallback randomizeTopicName runBuffered withBigBigExtraValue
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTestSerDes on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os != 'windows-latest' }}
shell: pwsh
run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/KNetTestSerDes.dll
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- name: Execute KNetTestSerDes on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
if: ${{ matrix.os == 'windows-latest' }}
shell: pwsh
run: ${{ github.workspace }}\bin\${{ matrix.framework }}\KNetTestSerDes.exe
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_5_17 }}

- uses: actions/upload-artifact@v4
if: ${{ failure() || cancelled() }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

package org.mases.knet.developed.streams;

public class KeyValueSupport<K, V> {
org.apache.kafka.streams.KeyValue<K, V> _innerKV;

public static <K, V> org.apache.kafka.streams.KeyValue<K, V> toKeyValue(KeyValueSupport<K, V> kvs) {
return new org.apache.kafka.streams.KeyValue<K, V>(kvs.getKey(), kvs.getValue());
}

public KeyValueSupport(org.apache.kafka.streams.KeyValue<K, V> innerKV) {
_innerKV = innerKV;
}

public K getKey() {
return _innerKV.key;
}

public V getValue() {
return _innerKV.value;
}
}
18 changes: 9 additions & 9 deletions src/net/KNet/Specific/Streams/KeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public sealed class KeyValue<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier
{
readonly Org.Apache.Kafka.Streams.KeyValue<TJVMK, TJVMV> _inner = null;
readonly KeyValueSupport<TJVMK, TJVMV> _inner = null;
K _key;
bool _keyStored;
V _value;
Expand All @@ -40,10 +40,10 @@ public sealed class KeyValue<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

internal KeyValue(IGenericSerDesFactory factory,
Org.Apache.Kafka.Streams.KeyValue<TJVMK, TJVMV> value,
ISerDes<K, TJVMK> keySerDes,
ISerDes<V, TJVMV> valueSerDes,
bool fromPrefetched)
KeyValueSupport<TJVMK, TJVMV> value,
ISerDes<K, TJVMK> keySerDes,
ISerDes<V, TJVMV> valueSerDes,
bool fromPrefetched)
{
_factory = factory;
_inner = value;
Expand All @@ -52,10 +52,10 @@ internal KeyValue(IGenericSerDesFactory factory,
if (fromPrefetched)
{
_keySerDes ??= _factory?.BuildKeySerDes<K, TJVMK>();
_key = _keySerDes.Deserialize(null, _inner.key);
_key = _keySerDes.Deserialize(null, _inner.Key);
_keyStored = true;
_valueSerDes ??= _factory?.BuildValueSerDes<V, TJVMV>();
_value = _valueSerDes.Deserialize(null, _inner.value);
_value = _valueSerDes.Deserialize(null, _inner.Value);
_valueStored = true;
}
}
Expand All @@ -70,7 +70,7 @@ public K Key
if (!_keyStored)
{
_keySerDes ??= _factory?.BuildKeySerDes<K, TJVMK>();
_key = _keySerDes.Deserialize(null, _inner.key);
_key = _keySerDes.Deserialize(null, _inner.Key);
_keyStored = true;
}
return _key;
Expand All @@ -86,7 +86,7 @@ public V Value
if (!_valueStored)
{
_valueSerDes ??= _factory?.BuildValueSerDes<V, TJVMV>();
_value = _valueSerDes.Deserialize(null, _inner.value);
_value = _valueSerDes.Deserialize(null, _inner.Value);
_valueStored = true;
}
return _value;
Expand Down
105 changes: 105 additions & 0 deletions src/net/KNet/Specific/Streams/KeyValueSupport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2024 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.JCOBridge.C2JBridge;
using MASES.JCOBridge.C2JBridge.JVMInterop;
using MASES.KNet.Serialization;
using System;

namespace MASES.KNet.Streams
{
#region KeyValue<K, V>
/// <summary>
/// Support class for <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.7.1/org/apache/kafka/streams/KeyValue.html#org.apache.kafka.streams.KeyValue"/>
/// </summary>
public partial class KeyValueSupport<K, V> : MASES.JCOBridge.C2JBridge.JVMBridgeBase<KeyValueSupport<K, V>>
{
#region Constructors
/// <summary>
/// Default constructor: even if the corresponding Java class does not have one, it is mandatory for JCOBridge
/// </summary>
public KeyValueSupport() { }
/// <summary>
/// Initialize a new instance of <see cref="KeyValueSupport{K, V}"/> from an instance of <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.7.1/org/apache/kafka/streams/KeyValue.html#org.apache.kafka.streams.KeyValue(java.lang.Object,java.lang.Object)"/>
/// </summary>
/// <param name="obj">The <see cref="IJavaObject"/> referring <see cref="Org.Apache.Kafka.Streams.KeyValue{K, V}"/></param>
public KeyValueSupport(IJavaObject obj)
: base(obj)
{
}

#endregion

const string _bridgeClassName = "org.mases.knet.developed.streams.KeyValueSupport";

private static readonly IJavaType _LocalBridgeClazz = ClazzOf(_bridgeClassName);
private static IJavaType LocalBridgeClazz => _LocalBridgeClazz ?? throw new InvalidOperationException($"Class {_bridgeClassName} was not found.");

/// <summary>
/// <see href="https://www.jcobridge.com/api-clr/html/P_MASES_JCOBridge_C2JBridge_JVMBridgeBase_BridgeClassName.htm"/>
/// </summary>
public override string BridgeClassName => _bridgeClassName;
/// <summary>
/// <see href="https://www.jcobridge.com/api-clr/html/P_MASES_JCOBridge_C2JBridge_JVMBridgeBase_IsBridgeAbstract.htm"/>
/// </summary>
public override bool IsBridgeAbstract => false;
/// <summary>
/// <see href="https://www.jcobridge.com/api-clr/html/P_MASES_JCOBridge_C2JBridge_JVMBridgeBase_IsBridgeCloseable.htm"/>
/// </summary>
public override bool IsBridgeCloseable => false;
/// <summary>
/// <see href="https://www.jcobridge.com/api-clr/html/P_MASES_JCOBridge_C2JBridge_JVMBridgeBase_IsBridgeInterface.htm"/>
/// </summary>
public override bool IsBridgeInterface => false;
/// <summary>
/// <see href="https://www.jcobridge.com/api-clr/html/P_MASES_JCOBridge_C2JBridge_JVMBridgeBase_IsBridgeStatic.htm"/>
/// </summary>
public override bool IsBridgeStatic => false;

#region Static methods
/// <summary>
/// Convert the <paramref name="kvs"/> into an instance of <see cref="Org.Apache.Kafka.Streams.KeyValue{K, V}"/>
/// </summary>
/// <param name="kvs">An instance of <see cref="KeyValueSupport{K, V}"/></param>
/// <returns>An instance of <see cref="Org.Apache.Kafka.Streams.KeyValue{K, V}"/></returns>
public static Org.Apache.Kafka.Streams.KeyValue<K, V> ToKeyValue(KeyValueSupport<K, V> kvs) => SExecute<Org.Apache.Kafka.Streams.KeyValue<K, V>>("toKeyValue", kvs);

#endregion

#region Instance methods
/// <summary>
/// The value of the field <see cref="Org.Apache.Kafka.Streams.KeyValue{K, V}.key"/>
/// </summary>
public K Key => IExecute<K>("getKey");
/// <summary>
/// The value of the field <see cref="Org.Apache.Kafka.Streams.KeyValue{K, V}.value"/>
/// </summary>
public V Value => IExecute<V>("getValue");

/// <summary>
/// Convert this instance into an instance of <see cref="Org.Apache.Kafka.Streams.KeyValue{K, V}"/>
/// </summary>
/// <returns>An instance of <see cref="Org.Apache.Kafka.Streams.KeyValue{K, V}"/></returns>
public Org.Apache.Kafka.Streams.KeyValue<K, V> ToKeyValue() => ToKeyValue(this);

#endregion

// TODO: complete the class
}
#endregion
}
22 changes: 18 additions & 4 deletions src/net/KNet/Specific/Streams/State/CommonIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,38 @@ protected IGenericSerDesFactory Factory
/// <inheritdoc/>
public IEnumerator<TIteratorType> GetEnumerator()
{
return GetEnumerator(false) as IEnumerator<TIteratorType>;
return GetEnumerator(false, UsePrefetch) as IEnumerator<TIteratorType>;
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

/// <inheritdoc/>
public IAsyncEnumerator<TIteratorType> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return GetEnumerator(true, cancellationToken) as IAsyncEnumerator<TIteratorType>;
return GetEnumerator(true, UsePrefetch, cancellationToken) as IAsyncEnumerator<TIteratorType>;
}

/// <summary>
/// Returns an <see cref="IEnumerator{E}"/> of <typeparamref name="TIteratorType"/>
/// </summary>
/// <param name="usePrefetch"><see langword="true"/> to return an <see cref="IEnumerator{T}"/> making preparation of <typeparamref name="TIteratorType"/> in parallel</param>
/// <returns>An <see cref="IEnumerator{T}"/> of <typeparamref name="TIteratorType"/></returns>
/// <remarks><paramref name="usePrefetch"/> is not considered with .NET 6 and .NET Framework</remarks>
public IEnumerator<TIteratorType> ToIEnumerator(bool usePrefetch = true)
{
return GetEnumerator(false, usePrefetch) as IEnumerator<TIteratorType>;
}

/// <summary>
/// Internally gets the <see cref="IEnumerable{T}"/> or <see cref="IAsyncEnumerable{T}"/>
/// </summary>
/// <param name="isAsync">If requesting an <see cref="IAsyncEnumerator{T}"/></param>
/// <param name="isAsync"><see langword="true"/> if requesting an <see cref="IAsyncEnumerator{T}"/></param>
/// <param name="usePrefetch"><see langword="true"/> if requesting prefetch behavior</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to be used in <see cref="IAsyncEnumerator{T}"/></param>
/// <returns>An <see cref="IEnumerable{T}"/> or <see cref="IAsyncEnumerable{T}"/></returns>
protected abstract object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default);
protected abstract object GetEnumerator(bool isAsync, bool usePrefetch, CancellationToken cancellationToken = default);
}
}
Loading

0 comments on commit 51d87d0

Please sign in to comment.