Skip to content

Commit

Permalink
KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueSto…
Browse files Browse the repository at this point in the history
…re (#10052)

Co-authored-by: Bruno Cadonna <bruno@confluent.io>

Reviewers: Bruno Cadonna <cadonna@confluent.io>, Guozhang Wang <guozhang@confluent.io>
  • Loading branch information
vamossagar12 authored Mar 2, 2021
1 parent a92b986 commit b2075a0
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ default KeyValueIterator<K, V> reverseAll() {
* prefix into the format in which the keys are stored in the stores needs to be passed to this method.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values.
* Since {@code prefixScan()} relies on byte lexicographical ordering and not on the ordering of the key type, results for some types might be unexpected.
* For example, if the key type is {@code Integer}, and the store contains keys [1, 2, 11, 13],
* then running {@code store.prefixScan(1, new IntegerSerializer())} will return [1] and not [1,11,13].
* In contrast, if the key type is {@code String} the keys will be sorted [1, 11, 13, 2] in the store and {@code store.prefixScan(1, new StringSerializer())} will return [1,11,13].
* In both cases {@code prefixScan()} starts the scan at 1 and stops at 2.
*
* @param prefix The prefix.
* @param prefixKeySerializer Serializer for the Prefix key type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,55 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;

public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {

private KeyValueStore<Bytes, byte[]> byteStore;
private final Serializer<String> stringSerializer = new StringSerializer();
private final KeyValueStoreTestDriver<Bytes, byte[]> byteStoreDriver = KeyValueStoreTestDriver.create(Bytes.class, byte[].class);

@Before
public void createStringKeyValueStore() {
super.before();
final StateStoreContext byteStoreContext = byteStoreDriver.context();
final StoreBuilder<KeyValueStore<Bytes, byte[]>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("in-memory-byte-store"),
new Serdes.BytesSerde(),
new Serdes.ByteArraySerde());
byteStore = storeBuilder.build();
byteStore.init(byteStoreContext, byteStore);
}

@After
public void after() {
super.after();
byteStore.close();
byteStoreDriver.clear();
}

@SuppressWarnings("unchecked")
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
Expand Down Expand Up @@ -60,4 +97,131 @@ public void shouldRemoveKeysWithNullValues() {

assertThat(store.get(0), nullValue());
}


@Test
public void shouldReturnKeysWithGivenPrefix() {

final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "k1")),
stringSerializer.serialize(null, "a")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "prefix_3")),
stringSerializer.serialize(null, "b")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "k2")),
stringSerializer.serialize(null, "c")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "prefix_2")),
stringSerializer.serialize(null, "d")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "k3")),
stringSerializer.serialize(null, "e")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "prefix_1")),
stringSerializer.serialize(null, "f")));

byteStore.putAll(entries);
byteStore.flush();

final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan("prefix", stringSerializer);
final List<String> valuesWithPrefix = new ArrayList<>();
int numberOfKeysReturned = 0;

while (keysWithPrefix.hasNext()) {
final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
valuesWithPrefix.add(new String(next.value));
numberOfKeysReturned++;
}
assertThat(numberOfKeysReturned, is(3));
assertThat(valuesWithPrefix.get(0), is("f"));
assertThat(valuesWithPrefix.get(1), is("d"));
assertThat(valuesWithPrefix.get(2), is("b"));
}

@Test
public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() {
final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "abc")),
stringSerializer.serialize(null, "f")));

entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "abcd")),
stringSerializer.serialize(null, "f")));

entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "abce")),
stringSerializer.serialize(null, "f")));

byteStore.putAll(entries);
byteStore.flush();

final KeyValueIterator<Bytes, byte[]> keysWithPrefixAsabcd = byteStore.prefixScan("abcd", stringSerializer);
int numberOfKeysReturned = 0;

while (keysWithPrefixAsabcd.hasNext()) {
keysWithPrefixAsabcd.next().key.get();
numberOfKeysReturned++;
}

assertThat(numberOfKeysReturned, is(1));
}

@Test
public void shouldReturnUUIDsWithStringPrefix() {
final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
final Serializer<UUID> uuidSerializer = Serdes.UUID().serializer();
final UUID uuid1 = UUID.randomUUID();
final UUID uuid2 = UUID.randomUUID();
final String prefix = uuid1.toString().substring(0, 4);
entries.add(new KeyValue<>(
new Bytes(uuidSerializer.serialize(null, uuid1)),
stringSerializer.serialize(null, "a")));
entries.add(new KeyValue<>(
new Bytes(uuidSerializer.serialize(null, uuid2)),
stringSerializer.serialize(null, "b")));

byteStore.putAll(entries);
byteStore.flush();

final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan(prefix, stringSerializer);
final List<String> valuesWithPrefix = new ArrayList<>();
int numberOfKeysReturned = 0;

while (keysWithPrefix.hasNext()) {
final KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
valuesWithPrefix.add(new String(next.value));
numberOfKeysReturned++;
}

assertThat(numberOfKeysReturned, is(1));
assertThat(valuesWithPrefix.get(0), is("a"));
}

@Test
public void shouldReturnNoKeys() {
final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "a")),
stringSerializer.serialize(null, "a")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "b")),
stringSerializer.serialize(null, "c")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "c")),
stringSerializer.serialize(null, "e")));
byteStore.putAll(entries);
byteStore.flush();

final KeyValueIterator<Bytes, byte[]> keysWithPrefix = byteStore.prefixScan("bb", stringSerializer);
int numberOfKeysReturned = 0;

while (keysWithPrefix.hasNext()) {
keysWithPrefix.next();
numberOfKeysReturned++;
}
assertThat(numberOfKeysReturned, is(0));
}
}

0 comments on commit b2075a0

Please sign in to comment.