From f291323017060bfdb92ab60da24419914aeebc98 Mon Sep 17 00:00:00 2001 From: s0r01ed Date: Tue, 10 Dec 2019 20:33:46 +0530 Subject: [PATCH 1/2] Changes for Prefix Seek on RocksDBStore- Initial commit --- gradle/dependencies.gradle | 2 +- .../RocksDbPrefixStateStoreBenchmark.java | 108 +++++++++++++++ .../internals/AbstractReadOnlyDecorator.java | 7 + .../internals/AbstractReadWriteDecorator.java | 7 + .../kafka/streams/state/KeyValueStore.java | 1 + .../streams/state/ReadOnlyKeyValueStore.java | 13 ++ .../state/internals/CachingKeyValueStore.java | 6 + .../ChangeLoggingKeyValueBytesStore.java | 6 + .../CompositeReadOnlyKeyValueStore.java | 6 + .../internals/InMemoryKeyValueStore.java | 22 ++- ...ToTimestampedKeyValueByteStoreAdapter.java | 6 + .../state/internals/MemoryLRUCache.java | 6 + .../state/internals/MeteredKeyValueStore.java | 10 ++ .../ReadOnlyKeyValueStoreFacade.java | 6 + .../internals/RocksDBPrefixIterator.java | 31 ++--- .../streams/state/internals/RocksDBStore.java | 22 +++ .../internals/RocksDBTimestampedStore.java | 5 + .../TimestampedKeyValueStoreBuilder.java | 6 + .../internals/ProcessorTopologyTest.java | 49 +++++-- .../state/internals/RocksDBStoreTest.java | 125 ++++++++++++++++-- .../test/GenericInMemoryKeyValueStore.java | 6 + ...nericInMemoryTimestampedKeyValueStore.java | 6 + .../apache/kafka/test/MockKeyValueStore.java | 6 + .../apache/kafka/test/NoOpReadOnlyStore.java | 6 + .../internals/KeyValueStoreFacade.java | 7 + 25 files changed, 432 insertions(+), 43 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/processor/internals/RocksDbPrefixStateStoreBenchmark.java diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index a0d93b8ced4c6..047e753a09a32 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -98,7 +98,7 @@ versions += [ owaspDepCheckPlugin: "5.3.2.1", powermock: "2.0.7", reflections: "0.9.12", - rocksDB: "5.18.4", + rocksDB: "5.18.3", scalaCollectionCompat: "2.1.6", scalafmt: "1.5.1", scalaJava8Compat : "0.9.1", diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/processor/internals/RocksDbPrefixStateStoreBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/processor/internals/RocksDbPrefixStateStoreBenchmark.java new file mode 100644 index 0000000000000..102592f5634d7 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/processor/internals/RocksDbPrefixStateStoreBenchmark.java @@ -0,0 +1,108 @@ +package org.apache.kafka.jmh.streams.processor.internals; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.*; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.RocksDBStore; +import org.apache.kafka.streams.state.internals.ThreadCache; +import org.openjdk.jmh.annotations.*; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +@State(Scope.Benchmark) +@Warmup(iterations = 5) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class RocksDbPrefixStateStoreBenchmark { + + private int counter; + + private static final int DISTINCT_KEYS = 10_000; + + private static final String KEY = "the_key_to_use"; + + private static final String VALUE = "the quick brown fox jumped over the lazy dog the olympics are about to start"; + + private final Bytes[] keys = new Bytes[DISTINCT_KEYS]; + + private final byte[][] values = new byte[DISTINCT_KEYS][]; + + private RocksDBStore prefixedStateStore; + + @Setup(Level.Trial) + public void setUp() { + for (int i = 0; i < DISTINCT_KEYS; ++i) { + keys[i] = Bytes.wrap((KEY + i).getBytes(StandardCharsets.UTF_8)); + values[i] = (VALUE + i).getBytes(StandardCharsets.UTF_8); + } + + prefixedStateStore = (RocksDBStore)Stores.persistentKeyValueStore("prefix-state-store").get(); + + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(), "prefix-test-metrics", StreamsConfig.METRICS_LATEST); + final ThreadCache cache = new ThreadCache(new LogContext("prefixTestCache "), 1_000_000_000, metrics); + final StreamsConfig config = new StreamsConfig(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "prefix-test"), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "prefix-test") + )); + + final TaskId id = new TaskId(0, 0); + final ProcessorStateManager stateMgr; + stateMgr = new ProcessorStateManager( + id, + Task.TaskType.ACTIVE, + false, + new LogContext("jmh"), + new StateDirectory(config, Time.SYSTEM, true), + null, + Collections.emptyMap(), + Collections.emptyList() + ); + + final ProcessorContextImpl context = new ProcessorContextImpl( + id, + null, + config, + null, + stateMgr, + metrics, + cache + ); + + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); + prefixedStateStore.init(context, prefixedStateStore); + + } + + @Benchmark + public KeyValueIterator testCachePerformance() { + counter++; + final int index = counter % DISTINCT_KEYS; + // Put multiple keys with the same prefix patterns. This way, the prefix seek use case would be catered. + final Bytes key = keys[index]; + prefixedStateStore.put(key, values[index]); + int numIterations = 10; + int nextIndex = index + 1; + while (numIterations >= 0) { + Bytes newKey = keys[nextIndex % DISTINCT_KEYS]; + prefixedStateStore.put(newKey, values[nextIndex % DISTINCT_KEYS]); + numIterations--; + nextIndex++; + } + + return prefixedStateStore.prefixScan(Bytes.wrap(KEY.getBytes())); + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java index a63cd992cbe63..c2a76565f4d8b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.processor.internals; import java.util.List; + +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; @@ -121,6 +123,11 @@ public void putAll(final List> entries) { public V delete(final K key) { throw new UnsupportedOperationException(ERROR_MESSAGE); } + + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } } static class TimestampedKeyValueStoreReadOnlyDecorator diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index 494d98eb5bf40..77ac2e44017aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.processor.internals; import java.util.List; + +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; @@ -115,6 +117,11 @@ public void putAll(final List> entries) { public V delete(final K key) { return wrapped().delete(key); } + + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } } static class TimestampedKeyValueStoreReadWriteDecorator diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java index b104ad488db3a..e058193886979 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java @@ -67,4 +67,5 @@ public interface KeyValueStore extends StateStore, ReadOnlyKeyValueStore { * @throws InvalidStateStoreException if the store is not initialized */ long approximateNumEntries(); + + /** + * Get an iterator over keys which have the specified prefix. The type of the prefix can be different from that of + * the key. That's why, callers should also pass a serializer for the prefix to convert the prefix into the + * format in which the keys are stored underneath in the stores + * @param prefix The prefix. + * @param prefixKeySerializer Serializer for the Prefix key type + * @param Prefix Serializer type + * @param

Prefix Type. + * @return The iterator for keys having the specified prefix. + */ + , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 3502fadfa29fb..f5d19bce670d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -190,6 +191,11 @@ public byte[] delete(final Bytes key) { } } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } + private byte[] deleteInternal(final Bytes key) { final byte[] v = getInternal(key); putInternal(key, null); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 35f6d365891ab..9b89f7f6e759a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -89,6 +90,11 @@ public byte[] delete(final Bytes key) { return oldValue; } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return wrapped().prefixScan(prefix, prefixKeySerializer); + } + @Override public byte[] get(final Bytes key) { return wrapped().get(key); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java index c790b89b13f65..c92c623fe60ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreType; @@ -111,6 +112,11 @@ public long approximateNumEntries() { return total; } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + throw new UnsupportedOperationException(); + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 27ec409dc4d31..ac16de64d5714 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -16,11 +16,9 @@ */ package org.apache.kafka.streams.state.internals; -import java.util.List; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.*; + +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -28,7 +26,6 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,6 +105,19 @@ public synchronized byte[] delete(final Bytes key) { return oldValue; } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + Objects.requireNonNull(prefix, "prefix cannot be null"); + Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null"); + + Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); + Bytes to = Bytes.increment(from); + + return new DelegatingPeekingKeyValueIterator<>( + name, + new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet())); + } + @Override public synchronized KeyValueIterator range(final Bytes from, final Bytes to) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java index 62cfac3b09ca2..8fb748df7fd66 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -76,6 +77,11 @@ public byte[] delete(final Bytes key) { return convertToTimestampedFormat(store.delete(key)); } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } + @Override public String name() { return store.name(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index d69df13f4f096..159d0cbfb3d62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -134,6 +135,11 @@ public synchronized byte[] delete(final Bytes key) { return this.map.remove(key); } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } + /** * @throws UnsupportedOperationException at every invocation */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index c844e03424096..256adecd52fb9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; @@ -173,6 +174,15 @@ public V delete(final K key) { } } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + + return new MeteredKeyValueIterator( + wrapped().prefixScan(prefix, prefixKeySerializer), + rangeSensor + ); + } + @Override public KeyValueIterator range(final K from, final K to) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java index 862e6fcfbe89e..e22493353f6e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -49,4 +50,9 @@ public KeyValueIterator all() { public long approximateNumEntries() { return inner.approximateNumEntries(); } + + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + throw new UnsupportedOperationException(); + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java index b84175e1ee0c6..388a8f7ed31d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java @@ -17,9 +17,11 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueIterator; import org.rocksdb.RocksIterator; +import java.nio.ByteBuffer; import java.util.Set; class RocksDBPrefixIterator extends RocksDbIterator { @@ -30,25 +32,24 @@ class RocksDBPrefixIterator extends RocksDbIterator { final Set> openIterators, final Bytes prefix) { super(name, newIterator, openIterators); - rawPrefix = prefix.get(); + this.rawPrefix = prefix.get(); newIterator.seek(rawPrefix); } - @Override - public synchronized boolean hasNext() { - if (!super.hasNext()) { - return false; - } + private boolean prefixEquals(final byte[] x, final byte[] y) { + final int min = Math.min(x.length, y.length); + final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min); + final ByteBuffer ySlice = ByteBuffer.wrap(y, 0, min); + return xSlice.equals(ySlice); + } - final byte[] rawNextKey = super.peekNextKey().get(); - for (int i = 0; i < rawPrefix.length; i++) { - if (i == rawNextKey.length) { - throw new IllegalStateException("Unexpected RocksDB Key Value. Should have been skipped with seek."); - } - if (rawNextKey[i] != rawPrefix[i]) { - return false; - } + @Override + public KeyValue makeNext() { + final KeyValue next = super.makeNext(); + if (next == null) return allDone(); + else { + if (prefixEquals(this.rawPrefix, next.key.get())) return next; + else return allDone(); } - return true; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index fc2bb24724f43..646aa0eb85048 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -304,6 +305,20 @@ public synchronized byte[] delete(final Bytes key) { return oldValue; } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + Objects.requireNonNull(prefix, "prefix cannot be null"); + Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null"); + + validateStoreOpen(); + Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); + + final KeyValueIterator rocksDbPrefixSeekIterator = dbAccessor.prefixSeek(prefixBytes); + openIterators.add(rocksDbPrefixSeekIterator); + + return rocksDbPrefixSeekIterator; + } + @Override public synchronized KeyValueIterator range(final Bytes from, final Bytes to) { @@ -477,6 +492,8 @@ void addToBatch(final byte[] key, final WriteBatch batch) throws RocksDBException; void close(); + + KeyValueIterator prefixSeek(final Bytes prefix); } class SingleColumnFamilyAccessor implements RocksDBAccessor { @@ -576,6 +593,11 @@ public void addToBatch(final byte[] key, public void close() { columnFamily.close(); } + + @Override + public KeyValueIterator prefixSeek(Bytes prefix) { + return new RocksDBPrefixIterator(name, db.newIterator(columnFamily), openIterators, prefix); + } } // not private for testing diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index 6c31e9b43d208..a678dda6b5d47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -249,6 +249,11 @@ public void close() { oldColumnFamily.close(); newColumnFamily.close(); } + + @Override + public KeyValueIterator prefixSeek(Bytes prefix) { + return null; + } } private class RocksDBDualCFIterator extends AbstractIterator> diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index 863b44ba4f68e..9946db321f553 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; @@ -122,6 +123,11 @@ public byte[] delete(final Bytes key) { return wrapped.delete(key); } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } + @Override public byte[] get(final Bytes key) { return wrapped.get(key); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 9f852cc0cfac4..66218a8ed030f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -26,12 +26,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -39,10 +34,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.*; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; @@ -273,6 +265,43 @@ public void testDrivingStatefulTopology() { assertNull(store.get("key4")); } + @Test + public void testPrefixScanStatefulTopology() { + final String storeName = "entries"; + driver = new TopologyTestDriver(createStatefulTopology(storeName), props); + final TestInputTopic inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); + final TestOutputTopic outputTopic1 = + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + + inputTopic.pipeInput("key1", "value1"); + inputTopic.pipeInput("key2", "value2"); + inputTopic.pipeInput("key3", "value3"); + inputTopic.pipeInput("key1", "value4"); + assertTrue(outputTopic1.isEmpty()); + + final KeyValueStore store = driver.getKeyValueStore(storeName); + final KeyValueIterator keysWithPrefix = store.prefixScan("key", Serdes.String().serializer()); + + String[] valuesWithPrefix = new String[3]; + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + KeyValue next = keysWithPrefix.next(); + valuesWithPrefix[numberOfKeysReturned++] = next.value; + } + + assertEquals(3, numberOfKeysReturned); + + assertEquals(valuesWithPrefix[0], "value4"); + assertEquals(valuesWithPrefix[1], "value2"); + assertEquals(valuesWithPrefix[2], "value3"); + + assertEquals("value4", store.get("key1")); + assertEquals("value2", store.get("key2")); + assertEquals("value3", store.get("key3")); + assertNull(store.get("key4")); + } + @Test public void testDrivingConnectedStateStoreTopology() { driver = new TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index c86b9143618a8..8f951dc6be8ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -21,11 +21,7 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -56,12 +52,7 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; +import java.util.*; import static java.nio.charset.StandardCharsets.UTF_8; import static org.easymock.EasyMock.anyObject; @@ -310,6 +301,118 @@ public void shouldPutAll() { rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3"))))); } + @Test + public void shouldReturnKeysWithGivenPrefix() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k1")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_3")), + stringSerializer.serialize(null, "b"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k2")), + stringSerializer.serialize(null, "c"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_2")), + stringSerializer.serialize(null, "d"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k3")), + stringSerializer.serialize(null, "e"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_1")), + stringSerializer.serialize(null, "f"))); + + rocksDBStore.init(context, rocksDBStore); + rocksDBStore.putAll(entries); + rocksDBStore.flush(); + + final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan("prefix", stringSerializer); + String[] valuesWithPrefix = new String[3]; + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + KeyValue next = keysWithPrefix.next(); + valuesWithPrefix[numberOfKeysReturned++] = new String(next.value); + } + // Since there are 3 keys prefixed with prefix, the count should be 3 + assertEquals(3, numberOfKeysReturned); + // The order might seem inverted to the order in which keys were inserted, but since Rocksdb stores keys + // lexicographically, prefix_1 would still be the first key that is returned. + assertEquals(valuesWithPrefix[0], "f"); + assertEquals(valuesWithPrefix[1], "d"); + assertEquals(valuesWithPrefix[2], "b"); + + // Lastly, simple key value lookups should still work :) + assertEquals( + "c", + stringDeserializer.deserialize( + null, + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "k2"))))); + + } + + @Test + public void shouldReturnUUIDsWithStringPrefix() { + final List> entries = new ArrayList<>(); + Serializer uuidSerializer = Serdes.UUID().serializer(); + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + String prefix = uuid1.toString().substring(0, 4); + entries.add(new KeyValue<>( + new Bytes(uuidSerializer.serialize(null, uuid1)), + stringSerializer.serialize(null, "a"))); + + entries.add(new KeyValue<>( + new Bytes(uuidSerializer.serialize(null, uuid2)), + stringSerializer.serialize(null, "b"))); + + + rocksDBStore.init(context, rocksDBStore); + rocksDBStore.putAll(entries); + rocksDBStore.flush(); + + final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan(prefix, stringSerializer); + String[] valuesWithPrefix = new String[1]; + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + KeyValue next = keysWithPrefix.next(); + valuesWithPrefix[numberOfKeysReturned++] = new String(next.value); + } + + assertEquals(1, numberOfKeysReturned); + assertEquals(valuesWithPrefix[0], "a"); + } + + @Test + public void shouldReturnNoKeys() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "a")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "b")), + stringSerializer.serialize(null, "c"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "c")), + stringSerializer.serialize(null, "e"))); + + rocksDBStore.init(context, rocksDBStore); + rocksDBStore.putAll(entries); + rocksDBStore.flush(); + + final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan("d", stringSerializer); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + keysWithPrefix.next(); + numberOfKeysReturned++; + } + // Since there are no keys prefixed with d, the count should be 0 + assertEquals(0, numberOfKeysReturned); + } + @Test public void shouldRestoreAll() { final List> entries = getKeyValueEntries(); diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java index 649dc5b377293..9f04d0fe7d293 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -123,6 +124,11 @@ public synchronized V delete(final K key) { return this.map.remove(key); } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } + @Override public synchronized KeyValueIterator range(final K from, final K to) { diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java index b1b75a16324bd..02fc149d6fdc5 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -124,6 +125,11 @@ public synchronized ValueAndTimestamp delete(final K key) { return this.map.remove(key); } + @Override + public , P> KeyValueIterator> prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } + @Override public synchronized KeyValueIterator> range(final K from, final K to) { diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java index 7cb376f370c1d..b62e326872d53 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -111,6 +112,11 @@ public Object delete(final Object key) { return null; } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } + @Override public void putAll(final List> entries) {} diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index dbdd0b4a15ecf..5d69842700d2e 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; @@ -64,6 +65,11 @@ public long approximateNumEntries() { return 0L; } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + throw new UnsupportedOperationException(); + } + @Override public String name() { return name; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java index 749c41f011827..896e4f004c725 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java @@ -17,9 +17,11 @@ package org.apache.kafka.streams.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -65,6 +67,11 @@ public V delete(final K key) { return getValueOrNull(inner.delete(key)); } + @Override + public , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + return null; + } + @Override public void flush() { inner.flush(); From fa8b0bf6a2b9bfd35cfc2e31ec82714186ceb832 Mon Sep 17 00:00:00 2001 From: s0r01ed Date: Wed, 16 Sep 2020 20:23:52 +0530 Subject: [PATCH 2/2] Renaming prefix seek to prefix scan --- .../apache/kafka/streams/state/internals/RocksDBStore.java | 6 +++--- .../streams/state/internals/RocksDBTimestampedStore.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 646aa0eb85048..e17369bc9ffa7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -313,7 +313,7 @@ public , P> KeyValueIterator prefixScan( validateStoreOpen(); Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); - final KeyValueIterator rocksDbPrefixSeekIterator = dbAccessor.prefixSeek(prefixBytes); + final KeyValueIterator rocksDbPrefixSeekIterator = dbAccessor.prefixScan(prefixBytes); openIterators.add(rocksDbPrefixSeekIterator); return rocksDbPrefixSeekIterator; @@ -493,7 +493,7 @@ void addToBatch(final byte[] key, void close(); - KeyValueIterator prefixSeek(final Bytes prefix); + KeyValueIterator prefixScan(final Bytes prefix); } class SingleColumnFamilyAccessor implements RocksDBAccessor { @@ -595,7 +595,7 @@ public void close() { } @Override - public KeyValueIterator prefixSeek(Bytes prefix) { + public KeyValueIterator prefixScan(Bytes prefix) { return new RocksDBPrefixIterator(name, db.newIterator(columnFamily), openIterators, prefix); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index a678dda6b5d47..0719e1120881f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -251,7 +251,7 @@ public void close() { } @Override - public KeyValueIterator prefixSeek(Bytes prefix) { + public KeyValueIterator prefixScan(Bytes prefix) { return null; } }