diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java index 1adfce1ec8bbd..2cc52c8f4596e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java @@ -104,6 +104,11 @@ default KeyValueIterator reverseAll() { * prefix into the format in which the keys are stored in the stores needs to be passed to this method. * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s * and must not return null values. + * Since {@code prefixScan()} relies on byte lexicographical ordering and not on the ordering of the key type, results for some types might be unexpected. + * For example, if the key type is {@code Integer}, and the store contains keys [1, 2, 11, 13], + * then running {@code store.prefixScan(1, new IntegerSerializer())} will return [1] and not [1,11,13]. + * In contrast, if the key type is {@code String} the keys will be sorted [1, 11, 13, 2] in the store and {@code store.prefixScan(1, new StringSerializer())} will return [1,11,13]. + * In both cases {@code prefixScan()} starts the scan at 1 and stops at 2. * * @param prefix The prefix. * @param prefixKeySerializer Serializer for the Prefix key type diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index 973093623c1ad..b4e3e1f3c2918 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -17,18 +17,55 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.KeyValueStoreTestDriver; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { + private KeyValueStore byteStore; + private final Serializer stringSerializer = new StringSerializer(); + private final KeyValueStoreTestDriver byteStoreDriver = KeyValueStoreTestDriver.create(Bytes.class, byte[].class); + + @Before + public void createStringKeyValueStore() { + super.before(); + final StateStoreContext byteStoreContext = byteStoreDriver.context(); + final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("in-memory-byte-store"), + new Serdes.BytesSerde(), + new Serdes.ByteArraySerde()); + byteStore = storeBuilder.build(); + byteStore.init(byteStoreContext, byteStore); + } + + @After + public void after() { + super.after(); + byteStore.close(); + byteStoreDriver.clear(); + } + @SuppressWarnings("unchecked") @Override protected KeyValueStore createKeyValueStore(final StateStoreContext context) { @@ -60,4 +97,131 @@ public void shouldRemoveKeysWithNullValues() { assertThat(store.get(0), nullValue()); } + + + @Test + public void shouldReturnKeysWithGivenPrefix() { + + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k1")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_3")), + stringSerializer.serialize(null, "b"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k2")), + stringSerializer.serialize(null, "c"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_2")), + stringSerializer.serialize(null, "d"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "k3")), + stringSerializer.serialize(null, "e"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "prefix_1")), + stringSerializer.serialize(null, "f"))); + + byteStore.putAll(entries); + byteStore.flush(); + + final KeyValueIterator keysWithPrefix = byteStore.prefixScan("prefix", stringSerializer); + final List valuesWithPrefix = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue next = keysWithPrefix.next(); + valuesWithPrefix.add(new String(next.value)); + numberOfKeysReturned++; + } + assertThat(numberOfKeysReturned, is(3)); + assertThat(valuesWithPrefix.get(0), is("f")); + assertThat(valuesWithPrefix.get(1), is("d")); + assertThat(valuesWithPrefix.get(2), is("b")); + } + + @Test + public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "abc")), + stringSerializer.serialize(null, "f"))); + + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "abcd")), + stringSerializer.serialize(null, "f"))); + + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "abce")), + stringSerializer.serialize(null, "f"))); + + byteStore.putAll(entries); + byteStore.flush(); + + final KeyValueIterator keysWithPrefixAsabcd = byteStore.prefixScan("abcd", stringSerializer); + int numberOfKeysReturned = 0; + + while (keysWithPrefixAsabcd.hasNext()) { + keysWithPrefixAsabcd.next().key.get(); + numberOfKeysReturned++; + } + + assertThat(numberOfKeysReturned, is(1)); + } + + @Test + public void shouldReturnUUIDsWithStringPrefix() { + final List> entries = new ArrayList<>(); + final Serializer uuidSerializer = Serdes.UUID().serializer(); + final UUID uuid1 = UUID.randomUUID(); + final UUID uuid2 = UUID.randomUUID(); + final String prefix = uuid1.toString().substring(0, 4); + entries.add(new KeyValue<>( + new Bytes(uuidSerializer.serialize(null, uuid1)), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(uuidSerializer.serialize(null, uuid2)), + stringSerializer.serialize(null, "b"))); + + byteStore.putAll(entries); + byteStore.flush(); + + final KeyValueIterator keysWithPrefix = byteStore.prefixScan(prefix, stringSerializer); + final List valuesWithPrefix = new ArrayList<>(); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + final KeyValue next = keysWithPrefix.next(); + valuesWithPrefix.add(new String(next.value)); + numberOfKeysReturned++; + } + + assertThat(numberOfKeysReturned, is(1)); + assertThat(valuesWithPrefix.get(0), is("a")); + } + + @Test + public void shouldReturnNoKeys() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "a")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "b")), + stringSerializer.serialize(null, "c"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "c")), + stringSerializer.serialize(null, "e"))); + byteStore.putAll(entries); + byteStore.flush(); + + final KeyValueIterator keysWithPrefix = byteStore.prefixScan("bb", stringSerializer); + int numberOfKeysReturned = 0; + + while (keysWithPrefix.hasNext()) { + keysWithPrefix.next(); + numberOfKeysReturned++; + } + assertThat(numberOfKeysReturned, is(0)); + } }