Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10648: Add Prefix Scan support to State Stores #9508

Merged
merged 8 commits into from
Feb 3, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.InvalidStateStoreException;

/**
Expand Down Expand Up @@ -97,6 +98,23 @@ default KeyValueIterator<K, V> reverseAll() {
throw new UnsupportedOperationException();
}

/**
* Return an iterator over all keys with the specified prefix.
* Since the type of the prefix can be different from that of the key, a serializer to convert the
* prefix into the format in which the keys are stored in the stores needs to be passed to this method.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values.
*
* @param prefix The prefix.
vamossagar12 marked this conversation as resolved.
Show resolved Hide resolved
* @param prefixKeySerializer Serializer for the Prefix key type
* @param <PS> Prefix Serializer type
* @param <P> Prefix Type.
* @return The iterator for keys having the specified prefix.
*/
default <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {
vamossagar12 marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException();
}

/**
* Return an approximate count of key-value mappings in this store.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -291,6 +292,16 @@ public KeyValueIterator<Bytes, byte[]> all() {
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method needs unit testing. Try to use a mock for the cache in the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment as below. Unit tests are in CachingInMemoryKeyValueStoreTest which already extends AbstractKeyValueStoreTest and creates an in memory cache store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I missed those. Sorry! That is fine then, although I think unit tests with mocks would be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I had created another ticket to streamline tests for CachingKVStore: https://issues.apache.org/jira/browse/KAFKA-10788. @rohitrmd had volunteered to take this up.

validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().prefixScan(prefix, prefixKeySerializer);
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
final Bytes to = Bytes.increment(from);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, from, to, false);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
validateStoreOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -97,6 +98,12 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
final PS prefixKeySerializer) {
return wrapped().prefixScan(prefix, prefixKeySerializer);
}

@Override
public byte[] delete(final Bytes key) {
final byte[] oldValue = wrapped().delete(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand All @@ -31,6 +32,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Objects;

public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {

Expand Down Expand Up @@ -103,6 +105,20 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
vamossagar12 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vamossagar12 I can still not find the unit test for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, do you want me to add the test cases here?https://github.com/apache/kafka/blob/17be91a37214bf77430c65d9300a5120e4348df9/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java

There are tests in CachingInMemoryKeyValueStoreTest, which is where the tests for other methods like range etc have been added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those tests never call the prefixScan() on the underlying in-memory state store because all entries fit into the cache. You would need to add another test that flushes the cache before you call prefixScan(). I would prefer a test that directly tests the in-memory store without any cache in between.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the new ticket: https://issues.apache.org/jira/browse/KAFKA-12289 and the PR for the ticket:
#10052

Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");

final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
final Bytes to = Bytes.increment(from);

return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet(), true)
);
}

@Override
public synchronized byte[] delete(final Bytes key) {
final byte[] oldValue = map.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class MeteredKeyValueStore<K, V>
private Sensor putAllSensor;
private Sensor allSensor;
private Sensor rangeSensor;
private Sensor prefixScanSensor;
private Sensor flushSensor;
private Sensor e2eLatencySensor;
private InternalProcessorContext context;
Expand Down Expand Up @@ -127,6 +129,7 @@ private void registerMetrics() {
getSensor = StateStoreMetrics.getSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
allSensor = StateStoreMetrics.allSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
rangeSensor = StateStoreMetrics.rangeSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
prefixScanSensor = StateStoreMetrics.prefixScanSensor(taskId, metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
deleteSensor = StateStoreMetrics.deleteSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
Expand Down Expand Up @@ -229,6 +232,12 @@ public V delete(final K key) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) {
vamossagar12 marked this conversation as resolved.
Show resolved Hide resolved

return new MeteredKeyValueIterator(wrapped().prefixScan(prefix, prefixKeySerializer), prefixScanSensor);
}

@Override
public KeyValueIterator<K, V> range(final K from,
final K to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ public boolean isEmpty() {
return cache.isEmpty();
}

synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), true);
synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to, final boolean toInclusive) {
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, toInclusive), true);
}

synchronized Iterator<Bytes> reverseKeyRange(final Bytes from, final Bytes to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ class RocksDBRangeIterator extends RocksDbIterator {
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private final byte[] rawLastKey;
private final boolean forward;
private final boolean toInclusive;

RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
final Bytes from,
final Bytes to,
final boolean forward) {
final boolean forward,
final boolean toInclusive) {
super(storeName, iter, openIterators, forward);
this.forward = forward;
this.toInclusive = toInclusive;
if (forward) {
iter.seek(from.get());
rawLastKey = to.get();
Expand All @@ -62,8 +65,10 @@ public KeyValue<Bytes, byte[]> makeNext() {
return allDone();
} else {
if (forward) {
if (comparator.compare(next.key.get(), rawLastKey) <= 0) {
if (comparator.compare(next.key.get(), rawLastKey) < 0) {
return next;
} else if (comparator.compare(next.key.get(), rawLastKey) == 0) {
return toInclusive ? next : allDone();
} else {
return allDone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -308,6 +309,21 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
final PS prefixKeySerializer) {
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");

validateStoreOpen();
final Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));

final KeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = dbAccessor.prefixScan(prefixBytes);
openIterators.add(rocksDbPrefixSeekIterator);

return rocksDbPrefixSeekIterator;
}

@Override
public synchronized byte[] get(final Bytes key) {
validateStoreOpen();
Expand Down Expand Up @@ -510,6 +526,8 @@ KeyValueIterator<Bytes, byte[]> range(final Bytes from,

KeyValueIterator<Bytes, byte[]> all(final boolean forward);

KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);

long approximateNumEntries() throws RocksDBException;

void flush() throws RocksDBException;
Expand Down Expand Up @@ -580,7 +598,9 @@ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
openIterators,
from,
to,
forward);
forward,
true
);
}

@Override
Expand All @@ -594,6 +614,20 @@ public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
return new RocksDbIterator(name, innerIterWithTimestamp, openIterators, forward);
}

@Override
public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
final Bytes to = Bytes.increment(prefix);
return new RocksDBRangeIterator(
name,
db.newIterator(columnFamily),
openIterators,
prefix,
to,
true,
false
);
}

@Override
public long approximateNumEntries() throws RocksDBException {
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
db.newIterator(oldColumnFamily),
from,
to,
forward);
forward,
true);
}

@Override
Expand All @@ -218,6 +219,20 @@ public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
return new RocksDBDualCFIterator(name, innerIterWithTimestamp, innerIterNoTimestamp, forward);
}

@Override
public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
final Bytes to = Bytes.increment(prefix);
return new RocksDBDualCFRangeIterator(
name,
db.newIterator(newColumnFamily),
db.newIterator(oldColumnFamily),
prefix,
to,
true,
false
);
}

@Override
public long approximateNumEntries() throws RocksDBException {
return db.getLongProperty(oldColumnFamily, "rocksdb.estimate-num-keys")
Expand Down Expand Up @@ -382,15 +397,18 @@ private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator {
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private final byte[] rawLastKey;
private final boolean forward;
private final boolean toInclusive;

RocksDBDualCFRangeIterator(final String storeName,
final RocksIterator iterWithTimestamp,
final RocksIterator iterNoTimestamp,
final Bytes from,
final Bytes to,
final boolean forward) {
final boolean forward,
final boolean toInclusive) {
super(storeName, iterWithTimestamp, iterNoTimestamp, forward);
this.forward = forward;
this.toInclusive = toInclusive;
if (forward) {
iterWithTimestamp.seek(from.get());
iterNoTimestamp.seek(from.get());
Expand All @@ -416,8 +434,11 @@ public KeyValue<Bytes, byte[]> makeNext() {
return allDone();
} else {
if (forward) {
if (comparator.compare(next.key.get(), rawLastKey) <= 0) {

if (comparator.compare(next.key.get(), rawLastKey) < 0) {
return next;
} else if (comparator.compare(next.key.get(), rawLastKey) == 0) {
return toInclusive ? next : allDone();
} else {
return allDone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,15 @@ public LRUCacheEntry delete(final String namespace, final Bytes key) {
}

public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
return range(namespace, from, to, true);
}

public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to, final boolean toInclusive) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics));
}
return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to, toInclusive), cache);
}

public MemoryLRUCacheBytesIterator reverseRange(final String namespace, final Bytes from, final Bytes to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ private StateStoreMetrics() {}
private static final String RANGE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION;
private static final String RANGE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION;

private static final String PREFIX_SCAN = "prefix-scan";
private static final String PREFIX_SCAN_DESCRIPTION = "calls to prefix-scan";
private static final String PREFIX_SCAN_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String PREFIX_SCAN_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION;
private static final String PREFIX_SCAN_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION;

private static final String FLUSH = "flush";
private static final String FLUSH_DESCRIPTION = "calls to flush";
private static final String FLUSH_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + FLUSH_DESCRIPTION;
Expand Down Expand Up @@ -307,6 +314,33 @@ public static Sensor rangeSensor(final String threadId,
);
}

public static Sensor prefixScanSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {

final String latencyMetricName = PREFIX_SCAN + LATENCY_SUFFIX;
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);

final Sensor sensor = streamsMetrics.storeLevelSensor(taskId, storeName, PREFIX_SCAN, RecordingLevel.DEBUG);
addInvocationRateToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
tagMap,
PREFIX_SCAN,
PREFIX_SCAN_RATE_DESCRIPTION
);
addAvgAndMaxToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
tagMap,
latencyMetricName,
PREFIX_SCAN_AVG_LATENCY_DESCRIPTION,
PREFIX_SCAN_MAX_LATENCY_DESCRIPTION
);
return sensor;
}

public static Sensor flushSensor(final String threadId,
final String taskId,
final String storeType,
Expand Down
Loading