diff --git a/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj b/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj index 116ea11..f342937 100644 --- a/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj +++ b/src/net/KEFCore.SerDes.Avro.Compiler/KEFCore.SerDes.Avro.Compiler.csproj @@ -23,7 +23,7 @@ - + diff --git a/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj b/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj index c68d777..71e1e95 100644 --- a/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj +++ b/src/net/KEFCore.SerDes/KEFCore.SerDes.csproj @@ -44,7 +44,7 @@ - + All None diff --git a/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs b/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs index 7fa6797..221e0d7 100644 --- a/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs @@ -291,7 +291,7 @@ public IEnumerator GetEnumerator() #if DEBUG_PERFORMANCE Infrastructure.KafkaDbContext.ReportString($"Requesting KafkaEnumerator for {_entityType.Name} on {DateTime.Now:HH:mm:ss.FFFFFFF}"); #endif - return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All, _useEnumeratorWithPrefetch, false); + return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All(), _useEnumeratorWithPrefetch, false); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() @@ -306,7 +306,7 @@ public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancel #if DEBUG_PERFORMANCE Infrastructure.KafkaDbContext.ReportString($"Requesting async KafkaEnumerator for {_entityType.Name} on {DateTime.Now:HH:mm:ss.FFFFFFF}"); #endif - return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All, _useEnumeratorWithPrefetch, true); + return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All(), _useEnumeratorWithPrefetch, true); } #endif } @@ -382,7 +382,7 @@ public ValueTask DisposeAsync() #if DEBUG_PERFORMANCE Infrastructure.KafkaDbContext.ReportString($"KafkaEnumerator _moveNextSw: {_moveNextSw.Elapsed} _currentSw: {_currentSw.Elapsed} _valueGetSw: {_valueGetSw.Elapsed} _valueGet2Sw: {_valueGet2Sw.Elapsed} _valueBufferSw: {_valueBufferSw.Elapsed}"); #endif - return _asyncEnumerator.DisposeAsync(); + return _asyncEnumerator != null ? _asyncEnumerator.DisposeAsync() : new ValueTask(); } #endif @@ -396,18 +396,18 @@ public bool MoveNext() { _moveNextSw.Start(); #endif - if (_useEnumeratorWithPrefetch ? _enumerator != null && _enumerator.MoveNext() : _keyValueIterator != null && _keyValueIterator.HasNext) + if (_useEnumeratorWithPrefetch ? _enumerator != null && _enumerator.MoveNext() : _keyValueIterator != null && _keyValueIterator.HasNext()) { #if DEBUG_PERFORMANCE _cycles++; _valueGetSw.Start(); #endif - KeyValue kv = _useEnumeratorWithPrefetch ? _enumerator.Current : _keyValueIterator.Next; + KeyValue? kv = _useEnumeratorWithPrefetch ? _enumerator?.Current : _keyValueIterator?.Next(); #if DEBUG_PERFORMANCE _valueGetSw.Stop(); _valueGet2Sw.Start(); #endif - TValue value = kv.Value; + TValue value = kv != null ? kv.Value : default; #if DEBUG_PERFORMANCE _valueGet2Sw.Stop(); _valueBufferSw.Start(); diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs index c79e55a..4d811c3 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs @@ -360,7 +360,8 @@ public bool MoveNext() V? data; using (KeyValue kv = _keyValueIterator.Next()) { - data = kv.value != null ? (V)(object)kv.value! : default; + var kvSupport = new MASES.KNet.Streams.KeyValueSupport(kv.BridgeInstance); + data = kvSupport.Value != null ? (V)(object)kvSupport.Value! : default; } #if DEBUG_PERFORMANCE _valueGetSw.Stop(); @@ -372,7 +373,7 @@ public bool MoveNext() _valueBufferSw.Start(); #endif object[] array = null!; - entityTypeData.GetData(_entityType, ref array); + entityTypeData?.GetData(_entityType, ref array); #if DEBUG_PERFORMANCE _valueBufferSw.Stop(); #endif