diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java index 37b85190c0f3f..1adfce1ec8bbd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.InvalidStateStoreException; /** @@ -97,6 +98,23 @@ default KeyValueIterator reverseAll() { throw new UnsupportedOperationException(); } + /** + * Return an iterator over all keys with the specified prefix. + * Since the type of the prefix can be different from that of the key, a serializer to convert the + * prefix into the format in which the keys are stored in the stores needs to be passed to this method. + * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s + * and must not return null values. + * + * @param prefix The prefix. + * @param prefixKeySerializer Serializer for the Prefix key type + * @param Prefix Serializer type + * @param

Prefix Type. + * @return The iterator for keys having the specified prefix. + */ + default , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { + throw new UnsupportedOperationException(); + } + /** * Return an approximate count of key-value mappings in this store. *

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 50d7c2518dbb6..224d1b222b57e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -291,6 +292,16 @@ public KeyValueIterator all() { return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); } + @Override + public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { + validateStoreOpen(); + final KeyValueIterator storeIterator = wrapped().prefixScan(prefix, prefixKeySerializer); + final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); + final Bytes to = Bytes.increment(from); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, from, to, false); + return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); + } + @Override public KeyValueIterator reverseAll() { validateStoreOpen(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index d6526a10053e3..3770696808ded 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -97,6 +98,12 @@ public void putAll(final List> entries) { } } + @Override + public , P> KeyValueIterator prefixScan(final P prefix, + final PS prefixKeySerializer) { + return wrapped().prefixScan(prefix, prefixKeySerializer); + } + @Override public byte[] delete(final Bytes key) { final byte[] oldValue = wrapped().delete(key); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 31041b91c3bd5..80508cede83c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -31,6 +32,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.Objects; public class InMemoryKeyValueStore implements KeyValueStore { @@ -103,6 +105,20 @@ public void putAll(final List> entries) { } } + @Override + public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { + Objects.requireNonNull(prefix, "prefix cannot be null"); + Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null"); + + final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); + final Bytes to = Bytes.increment(from); + + return new DelegatingPeekingKeyValueIterator<>( + name, + new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet(), true) + ); + } + @Override public synchronized byte[] delete(final Bytes key) { final byte[] oldValue = map.remove(key); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index a9470ee804b36..51e6dd02e99d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; @@ -67,6 +68,7 @@ public class MeteredKeyValueStore private Sensor putAllSensor; private Sensor allSensor; private Sensor rangeSensor; + private Sensor prefixScanSensor; private Sensor flushSensor; private Sensor e2eLatencySensor; private InternalProcessorContext context; @@ -127,6 +129,7 @@ private void registerMetrics() { getSensor = StateStoreMetrics.getSensor(threadId, taskId, metricsScope, name(), streamsMetrics); allSensor = StateStoreMetrics.allSensor(threadId, taskId, metricsScope, name(), streamsMetrics); rangeSensor = StateStoreMetrics.rangeSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + prefixScanSensor = StateStoreMetrics.prefixScanSensor(taskId, metricsScope, name(), streamsMetrics); flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics); deleteSensor = StateStoreMetrics.deleteSensor(threadId, taskId, metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics); @@ -229,6 +232,12 @@ public V delete(final K key) { } } + @Override + public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { + + return new MeteredKeyValueIterator(wrapped().prefixScan(prefix, prefixKeySerializer), prefixScanSensor); + } + @Override public KeyValueIterator range(final K from, final K to) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 032a303f617e2..e4b054b8e7ebc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -280,8 +280,8 @@ public boolean isEmpty() { return cache.isEmpty(); } - synchronized Iterator keyRange(final Bytes from, final Bytes to) { - return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), true); + synchronized Iterator keyRange(final Bytes from, final Bytes to, final boolean toInclusive) { + return keySetIterator(cache.navigableKeySet().subSet(from, true, to, toInclusive), true); } synchronized Iterator reverseKeyRange(final Bytes from, final Bytes to) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java index 29154860af3d2..7646eb56e1890 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java @@ -31,15 +31,18 @@ class RocksDBRangeIterator extends RocksDbIterator { private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; private final byte[] rawLastKey; private final boolean forward; + private final boolean toInclusive; RocksDBRangeIterator(final String storeName, final RocksIterator iter, final Set> openIterators, final Bytes from, final Bytes to, - final boolean forward) { + final boolean forward, + final boolean toInclusive) { super(storeName, iter, openIterators, forward); this.forward = forward; + this.toInclusive = toInclusive; if (forward) { iter.seek(from.get()); rawLastKey = to.get(); @@ -62,8 +65,10 @@ public KeyValue makeNext() { return allDone(); } else { if (forward) { - if (comparator.compare(next.key.get(), rawLastKey) <= 0) { + if (comparator.compare(next.key.get(), rawLastKey) < 0) { return next; + } else if (comparator.compare(next.key.get(), rawLastKey) == 0) { + return toInclusive ? next : allDone(); } else { return allDone(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3241d070b2169..12ba4eb5c3024 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -308,6 +309,21 @@ public void putAll(final List> entries) { } } + @Override + public , P> KeyValueIterator prefixScan(final P prefix, + final PS prefixKeySerializer) { + Objects.requireNonNull(prefix, "prefix cannot be null"); + Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null"); + + validateStoreOpen(); + final Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); + + final KeyValueIterator rocksDbPrefixSeekIterator = dbAccessor.prefixScan(prefixBytes); + openIterators.add(rocksDbPrefixSeekIterator); + + return rocksDbPrefixSeekIterator; + } + @Override public synchronized byte[] get(final Bytes key) { validateStoreOpen(); @@ -510,6 +526,8 @@ KeyValueIterator range(final Bytes from, KeyValueIterator all(final boolean forward); + KeyValueIterator prefixScan(final Bytes prefix); + long approximateNumEntries() throws RocksDBException; void flush() throws RocksDBException; @@ -580,7 +598,9 @@ public KeyValueIterator range(final Bytes from, openIterators, from, to, - forward); + forward, + true + ); } @Override @@ -594,6 +614,20 @@ public KeyValueIterator all(final boolean forward) { return new RocksDbIterator(name, innerIterWithTimestamp, openIterators, forward); } + @Override + public KeyValueIterator prefixScan(final Bytes prefix) { + final Bytes to = Bytes.increment(prefix); + return new RocksDBRangeIterator( + name, + db.newIterator(columnFamily), + openIterators, + prefix, + to, + true, + false + ); + } + @Override public long approximateNumEntries() throws RocksDBException { return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys"); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index aa65e7f971317..db3d17560bf3e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -201,7 +201,8 @@ public KeyValueIterator range(final Bytes from, db.newIterator(oldColumnFamily), from, to, - forward); + forward, + true); } @Override @@ -218,6 +219,20 @@ public KeyValueIterator all(final boolean forward) { return new RocksDBDualCFIterator(name, innerIterWithTimestamp, innerIterNoTimestamp, forward); } + @Override + public KeyValueIterator prefixScan(final Bytes prefix) { + final Bytes to = Bytes.increment(prefix); + return new RocksDBDualCFRangeIterator( + name, + db.newIterator(newColumnFamily), + db.newIterator(oldColumnFamily), + prefix, + to, + true, + false + ); + } + @Override public long approximateNumEntries() throws RocksDBException { return db.getLongProperty(oldColumnFamily, "rocksdb.estimate-num-keys") @@ -382,15 +397,18 @@ private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator { private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; private final byte[] rawLastKey; private final boolean forward; + private final boolean toInclusive; RocksDBDualCFRangeIterator(final String storeName, final RocksIterator iterWithTimestamp, final RocksIterator iterNoTimestamp, final Bytes from, final Bytes to, - final boolean forward) { + final boolean forward, + final boolean toInclusive) { super(storeName, iterWithTimestamp, iterNoTimestamp, forward); this.forward = forward; + this.toInclusive = toInclusive; if (forward) { iterWithTimestamp.seek(from.get()); iterNoTimestamp.seek(from.get()); @@ -416,8 +434,11 @@ public KeyValue makeNext() { return allDone(); } else { if (forward) { - if (comparator.compare(next.key.get(), rawLastKey) <= 0) { + + if (comparator.compare(next.key.get(), rawLastKey) < 0) { return next; + } else if (comparator.compare(next.key.get(), rawLastKey) == 0) { + return toInclusive ? next : allDone(); } else { return allDone(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 0ab0a3e6445f0..fce7875835c6a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -187,11 +187,15 @@ public LRUCacheEntry delete(final String namespace, final Bytes key) { } public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) { + return range(namespace, from, to, true); + } + + public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to, final boolean toInclusive) { final NamedCache cache = getCache(namespace); if (cache == null) { return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics)); } - return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache); + return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to, toInclusive), cache); } public MemoryLRUCacheBytesIterator reverseRange(final String namespace, final Bytes from, final Bytes to) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java index d032cadffdad6..06402f4ab48ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java @@ -110,6 +110,13 @@ private StateStoreMetrics() {} private static final String RANGE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION; private static final String RANGE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION; + private static final String PREFIX_SCAN = "prefix-scan"; + private static final String PREFIX_SCAN_DESCRIPTION = "calls to prefix-scan"; + private static final String PREFIX_SCAN_RATE_DESCRIPTION = + RATE_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION + RATE_DESCRIPTION_SUFFIX; + private static final String PREFIX_SCAN_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION; + private static final String PREFIX_SCAN_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION; + private static final String FLUSH = "flush"; private static final String FLUSH_DESCRIPTION = "calls to flush"; private static final String FLUSH_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + FLUSH_DESCRIPTION; @@ -307,6 +314,33 @@ public static Sensor rangeSensor(final String threadId, ); } + public static Sensor prefixScanSensor(final String taskId, + final String storeType, + final String storeName, + final StreamsMetricsImpl streamsMetrics) { + + final String latencyMetricName = PREFIX_SCAN + LATENCY_SUFFIX; + final Map tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName); + + final Sensor sensor = streamsMetrics.storeLevelSensor(taskId, storeName, PREFIX_SCAN, RecordingLevel.DEBUG); + addInvocationRateToSensor( + sensor, + STATE_STORE_LEVEL_GROUP, + tagMap, + PREFIX_SCAN, + PREFIX_SCAN_RATE_DESCRIPTION + ); + addAvgAndMaxToSensor( + sensor, + STATE_STORE_LEVEL_GROUP, + tagMap, + latencyMetricName, + PREFIX_SCAN_AVG_LATENCY_DESCRIPTION, + PREFIX_SCAN_MAX_LATENCY_DESCRIPTION + ); + return sensor; + } + public static Sensor flushSensor(final String threadId, final String taskId, final String storeType, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java index a9085a6c13847..66b13c1ad434f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; @@ -45,6 +46,7 @@ import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -359,6 +361,60 @@ public void shouldReverseIterateOverRange() { ), results); } + @Test + public void shouldGetRecordsWithPrefixKey() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>(bytesKey("p11"), bytesValue("2"))); + entries.add(new KeyValue<>(bytesKey("k1"), bytesValue("1"))); + entries.add(new KeyValue<>(bytesKey("k2"), bytesValue("2"))); + entries.add(new KeyValue<>(bytesKey("p2"), bytesValue("2"))); + entries.add(new KeyValue<>(bytesKey("p1"), bytesValue("2"))); + entries.add(new KeyValue<>(bytesKey("p0"), bytesValue("2"))); + + store.putAll(entries); + + final KeyValueIterator keysWithPrefix = store.prefixScan("p1", new StringSerializer()); + final List keys = new ArrayList<>(); + final List values = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue next = keysWithPrefix.next(); + keys.add(next.key.toString()); + values.add(new String(next.value)); + numberOfKeysReturned++; + } + assertThat(numberOfKeysReturned, is(2)); + assertThat(keys, is(Arrays.asList("p1", "p11"))); + assertThat(values, is(Arrays.asList("2", "2"))); + } + + @Test + public void shouldGetRecordsWithPrefixKeyExcludingNextLargestKey() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>(bytesKey("abcd"), bytesValue("2"))); + entries.add(new KeyValue<>(bytesKey("abcdd"), bytesValue("1"))); + entries.add(new KeyValue<>(bytesKey("abce"), bytesValue("2"))); + entries.add(new KeyValue<>(bytesKey("abc"), bytesValue("2"))); + + store.putAll(entries); + + final KeyValueIterator keysWithPrefix = store.prefixScan("abcd", new StringSerializer()); + final List keys = new ArrayList<>(); + final List values = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue next = keysWithPrefix.next(); + keys.add(next.key.toString()); + values.add(new String(next.value)); + numberOfKeysReturned++; + } + assertThat(numberOfKeysReturned, is(2)); + assertThat(keys, is(Arrays.asList("abcd", "abcdd"))); + assertThat(values, is(Arrays.asList("2", "1"))); + } + @Test public void shouldDeleteItemsFromCache() { store.put(bytesKey("a"), bytesValue("a")); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index c3808b24887b5..255994ca33b44 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; @@ -25,6 +26,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; @@ -34,7 +36,10 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -196,6 +201,26 @@ public void shouldReturnValueOnGetWhenExists() { assertThat(store.get(hello), equalTo(world)); } + @Test + public void shouldGetRecordsWithPrefixKey() { + store.put(hi, there); + store.put(Bytes.increment(hi), world); + final KeyValueIterator keysWithPrefix = store.prefixScan(hi.toString(), new StringSerializer()); + final List keys = new ArrayList<>(); + final List values = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue next = keysWithPrefix.next(); + keys.add(next.key); + values.add(Bytes.wrap(next.value)); + numberOfKeysReturned++; + } + assertThat(numberOfKeysReturned, is(1)); + assertThat(keys, is(Collections.singletonList(hi))); + assertThat(values, is(Collections.singletonList(Bytes.wrap(there)))); + } + @Test public void shouldReturnNullOnGetWhenDoesntExist() { assertThat(store.get(hello), is(nullValue())); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 68faa15007b5f..88b72df468c3d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -434,6 +435,22 @@ public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() { verify(inner); } + @Test + public void shouldGetRecordsWithPrefixKey() { + final StringSerializer stringSerializer = new StringSerializer(); + expect(inner.prefixScan(KEY, stringSerializer)) + .andReturn(new KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator())); + init(); + + final KeyValueIterator iterator = metered.prefixScan(KEY, stringSerializer); + assertThat(iterator.next().value, equalTo(VALUE)); + iterator.close(); + + final KafkaMetric metric = metrics.metric(new MetricName("prefix-scan-rate", STORE_LEVEL_GROUP, "", tags)); + assertTrue((Double) metric.metricValue() > 0); + verify(inner); + } + private KafkaMetric metric(final MetricName metricName) { return this.metrics.metric(metricName); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 4d87ee4de9997..0568df2d5689c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -66,6 +66,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; import static java.nio.charset.StandardCharsets.UTF_8; import static org.easymock.EasyMock.eq; @@ -360,6 +361,135 @@ public void shouldPutAll() { rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3"))))); } + @Test + public void shouldReturnKeysWithGivenPrefix() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k1")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_3")), + stringSerializer.serialize(null, "b"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k2")), + stringSerializer.serialize(null, "c"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_2")), + stringSerializer.serialize(null, "d"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k3")), + stringSerializer.serialize(null, "e"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_1")), + stringSerializer.serialize(null, "f"))); + + rocksDBStore.init((StateStoreContext) context, rocksDBStore); + rocksDBStore.putAll(entries); + rocksDBStore.flush(); + + final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan("prefix", stringSerializer); + final List valuesWithPrefix = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue next = keysWithPrefix.next(); + valuesWithPrefix.add(new String(next.value)); + numberOfKeysReturned++; + } + assertThat(numberOfKeysReturned, is(3)); + assertThat(valuesWithPrefix.get(0), is("f")); + assertThat(valuesWithPrefix.get(1), is("d")); + assertThat(valuesWithPrefix.get(2), is("b")); + } + + @Test + public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "abc")), + stringSerializer.serialize(null, "f"))); + + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "abcd")), + stringSerializer.serialize(null, "f"))); + + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "abce")), + stringSerializer.serialize(null, "f"))); + + rocksDBStore.init((StateStoreContext) context, rocksDBStore); + rocksDBStore.putAll(entries); + rocksDBStore.flush(); + + final KeyValueIterator keysWithPrefixAsabcd = rocksDBStore.prefixScan("abcd", stringSerializer); + int numberOfKeysReturned = 0; + + while (keysWithPrefixAsabcd.hasNext()) { + keysWithPrefixAsabcd.next().key.get(); + numberOfKeysReturned++; + } + + assertThat(numberOfKeysReturned, is(1)); + } + + @Test + public void shouldReturnUUIDsWithStringPrefix() { + final List> entries = new ArrayList<>(); + final Serializer uuidSerializer = Serdes.UUID().serializer(); + final UUID uuid1 = UUID.randomUUID(); + final UUID uuid2 = UUID.randomUUID(); + final String prefix = uuid1.toString().substring(0, 4); + entries.add(new KeyValue<>( + new Bytes(uuidSerializer.serialize(null, uuid1)), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(uuidSerializer.serialize(null, uuid2)), + stringSerializer.serialize(null, "b"))); + + rocksDBStore.init((StateStoreContext) context, rocksDBStore); + rocksDBStore.putAll(entries); + rocksDBStore.flush(); + + final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan(prefix, stringSerializer); + final List valuesWithPrefix = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue next = keysWithPrefix.next(); + valuesWithPrefix.add(new String(next.value)); + numberOfKeysReturned++; + } + + assertThat(numberOfKeysReturned, is(1)); + assertThat(valuesWithPrefix.get(0), is("a")); + } + + @Test + public void shouldReturnNoKeys() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "a")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "b")), + stringSerializer.serialize(null, "c"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "c")), + stringSerializer.serialize(null, "e"))); + rocksDBStore.init((StateStoreContext) context, rocksDBStore); + rocksDBStore.putAll(entries); + rocksDBStore.flush(); + + final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan("d", stringSerializer); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + keysWithPrefix.next(); + numberOfKeysReturned++; + } + assertThat(numberOfKeysReturned, is(0)); + } + @Test public void shouldRestoreAll() { final List> entries = getKeyValueEntries(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java index 75b4e8034983b..a1d511ae1ca71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStoreContext; @@ -43,6 +45,8 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { + private final Serializer stringSerializer = new StringSerializer(); + RocksDBStore getRocksDBStore() { return new RocksDBTimestampedStore(DB_NAME, METRICS_SCOPE); } @@ -337,6 +341,21 @@ private void iteratorsShouldNotMigrateData() { } assertFalse(it.hasNext()); } + + try (final KeyValueIterator it = rocksDBStore.prefixScan("key1", stringSerializer)) { + { + final KeyValue keyValue = it.next(); + assertArrayEquals("key1".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 1 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '1'}, keyValue.value); + } + { + final KeyValue keyValue = it.next(); + assertArrayEquals("key11".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, keyValue.value); + } + assertFalse(it.hasNext()); + } } private void verifyOldAndNewColumnFamily() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index 77be8126d1076..afd5449afd446 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -285,6 +285,25 @@ public void shouldPeekAndIterateOverRange() { assertEquals(5, bytesIndex); } + @Test + public void shouldSkipToEntryWhentoInclusiveIsFalseInRange() { + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); + final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; + for (final byte[] aByte : bytes) { + cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte)); + } + final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}), false); + int bytesIndex = 1; + while (iterator.hasNext()) { + final Bytes peekedKey = iterator.peekNextKey(); + final KeyValue next = iterator.next(); + assertArrayEquals(bytes[bytesIndex], peekedKey.get()); + assertArrayEquals(bytes[bytesIndex], next.key.get()); + bytesIndex++; + } + assertEquals(4, bytesIndex); + } + @Test public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() { final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], ""); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java index 35960e199e4dd..4e4d6c87061a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java @@ -204,6 +204,38 @@ public void shouldGetRangeSensor() { ); } + @Test + public void shouldGetPrefixScanSensor() { + final String metricName = "prefix-scan"; + final String descriptionOfRate = "The average number of calls to prefix-scan per second"; + final String descriptionOfAvg = "The average latency of calls to prefix-scan"; + final String descriptionOfMax = "The maximum latency of calls to prefix-scan"; + expect(streamsMetrics.storeLevelSensor(TASK_ID, STORE_NAME, metricName, RecordingLevel.DEBUG)) + .andReturn(expectedSensor); + expect(streamsMetrics.storeLevelTagMap(TASK_ID, STORE_TYPE, STORE_NAME)).andReturn(storeTagMap); + StreamsMetricsImpl.addInvocationRateToSensor( + expectedSensor, + STORE_LEVEL_GROUP, + storeTagMap, + metricName, + descriptionOfRate + ); + StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + STORE_LEVEL_GROUP, + storeTagMap, + latencyMetricName(metricName), + descriptionOfAvg, + descriptionOfMax + ); + replay(StreamsMetricsImpl.class, streamsMetrics); + + final Sensor sensor = StateStoreMetrics.prefixScanSensor(TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics); + + verify(StreamsMetricsImpl.class, streamsMetrics); + assertThat(sensor, is(expectedSensor)); + } + @Test public void shouldGetFlushSensor() { final String metricName = "flush"; @@ -484,4 +516,4 @@ private void shouldGetSuppressionBufferSensor(final String metricName, verify(StreamsMetricsImpl.class, streamsMetrics); assertThat(sensor, is(expectedSensor)); } -} \ No newline at end of file +}