Skip to content

Commit

Permalink
KAFKA-10648: Add Prefix Scan support to State Stores
Browse files Browse the repository at this point in the history
  • Loading branch information
vamossagar12 committed Jan 20, 2021
1 parent 474ad1e commit 17be91a
Show file tree
Hide file tree
Showing 18 changed files with 467 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.InvalidStateStoreException;

/**
Expand Down Expand Up @@ -97,6 +98,23 @@ default KeyValueIterator<K, V> reverseAll() {
throw new UnsupportedOperationException();
}

/**
* Return an iterator over all keys with the specified prefix.
* Since the type of the prefix can be different from that of the key, a serializer to convert the
* prefix into the format in which the keys are stored in the stores needs to be passed to this method.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values.
*
* @param prefix The prefix.
* @param prefixKeySerializer Serializer for the Prefix key type
* @param <PS> Prefix Serializer type
* @param <P> Prefix Type.
* @return The iterator for keys having the specified prefix.
*/
default <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {
throw new UnsupportedOperationException();
}

/**
* Return an approximate count of key-value mappings in this store.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -291,6 +292,16 @@ public KeyValueIterator<Bytes, byte[]> all() {
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().prefixScan(prefix, prefixKeySerializer);
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
final Bytes to = Bytes.increment(from);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, from, to, false);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
validateStoreOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -97,6 +98,12 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
final PS prefixKeySerializer) {
return wrapped().prefixScan(prefix, prefixKeySerializer);
}

@Override
public byte[] delete(final Bytes key) {
final byte[] oldValue = wrapped().delete(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand All @@ -31,6 +32,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Objects;

public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {

Expand Down Expand Up @@ -103,6 +105,20 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");

final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
final Bytes to = Bytes.increment(from);

return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet(), true)
);
}

@Override
public synchronized byte[] delete(final Bytes key) {
final byte[] oldValue = map.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class MeteredKeyValueStore<K, V>
private Sensor putAllSensor;
private Sensor allSensor;
private Sensor rangeSensor;
private Sensor prefixScanSensor;
private Sensor flushSensor;
private Sensor e2eLatencySensor;
private InternalProcessorContext context;
Expand Down Expand Up @@ -127,6 +129,7 @@ private void registerMetrics() {
getSensor = StateStoreMetrics.getSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
allSensor = StateStoreMetrics.allSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
rangeSensor = StateStoreMetrics.rangeSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
prefixScanSensor = StateStoreMetrics.prefixScanSensor(taskId, metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
deleteSensor = StateStoreMetrics.deleteSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
Expand Down Expand Up @@ -229,6 +232,12 @@ public V delete(final K key) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) {

return new MeteredKeyValueIterator(wrapped().prefixScan(prefix, prefixKeySerializer), prefixScanSensor);
}

@Override
public KeyValueIterator<K, V> range(final K from,
final K to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ public boolean isEmpty() {
return cache.isEmpty();
}

synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), true);
synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to, final boolean toInclusive) {
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, toInclusive), true);
}

synchronized Iterator<Bytes> reverseKeyRange(final Bytes from, final Bytes to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ class RocksDBRangeIterator extends RocksDbIterator {
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private final byte[] rawLastKey;
private final boolean forward;
private final boolean toInclusive;

RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
final Bytes from,
final Bytes to,
final boolean forward) {
final boolean forward,
final boolean toInclusive) {
super(storeName, iter, openIterators, forward);
this.forward = forward;
this.toInclusive = toInclusive;
if (forward) {
iter.seek(from.get());
rawLastKey = to.get();
Expand All @@ -62,8 +65,10 @@ public KeyValue<Bytes, byte[]> makeNext() {
return allDone();
} else {
if (forward) {
if (comparator.compare(next.key.get(), rawLastKey) <= 0) {
if (comparator.compare(next.key.get(), rawLastKey) < 0) {
return next;
} else if (comparator.compare(next.key.get(), rawLastKey) == 0) {
return toInclusive ? next : allDone();
} else {
return allDone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -308,6 +309,21 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
final PS prefixKeySerializer) {
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");

validateStoreOpen();
final Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));

final KeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = dbAccessor.prefixScan(prefixBytes);
openIterators.add(rocksDbPrefixSeekIterator);

return rocksDbPrefixSeekIterator;
}

@Override
public synchronized byte[] get(final Bytes key) {
validateStoreOpen();
Expand Down Expand Up @@ -510,6 +526,8 @@ KeyValueIterator<Bytes, byte[]> range(final Bytes from,

KeyValueIterator<Bytes, byte[]> all(final boolean forward);

KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);

long approximateNumEntries() throws RocksDBException;

void flush() throws RocksDBException;
Expand Down Expand Up @@ -580,7 +598,9 @@ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
openIterators,
from,
to,
forward);
forward,
true
);
}

@Override
Expand All @@ -594,6 +614,20 @@ public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
return new RocksDbIterator(name, innerIterWithTimestamp, openIterators, forward);
}

@Override
public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
final Bytes to = Bytes.increment(prefix);
return new RocksDBRangeIterator(
name,
db.newIterator(columnFamily),
openIterators,
prefix,
to,
true,
false
);
}

@Override
public long approximateNumEntries() throws RocksDBException {
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
db.newIterator(oldColumnFamily),
from,
to,
forward);
forward,
true);
}

@Override
Expand All @@ -218,6 +219,20 @@ public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
return new RocksDBDualCFIterator(name, innerIterWithTimestamp, innerIterNoTimestamp, forward);
}

@Override
public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
final Bytes to = Bytes.increment(prefix);
return new RocksDBDualCFRangeIterator(
name,
db.newIterator(newColumnFamily),
db.newIterator(oldColumnFamily),
prefix,
to,
true,
false
);
}

@Override
public long approximateNumEntries() throws RocksDBException {
return db.getLongProperty(oldColumnFamily, "rocksdb.estimate-num-keys")
Expand Down Expand Up @@ -382,15 +397,18 @@ private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator {
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private final byte[] rawLastKey;
private final boolean forward;
private final boolean toInclusive;

RocksDBDualCFRangeIterator(final String storeName,
final RocksIterator iterWithTimestamp,
final RocksIterator iterNoTimestamp,
final Bytes from,
final Bytes to,
final boolean forward) {
final boolean forward,
final boolean toInclusive) {
super(storeName, iterWithTimestamp, iterNoTimestamp, forward);
this.forward = forward;
this.toInclusive = toInclusive;
if (forward) {
iterWithTimestamp.seek(from.get());
iterNoTimestamp.seek(from.get());
Expand All @@ -416,8 +434,11 @@ public KeyValue<Bytes, byte[]> makeNext() {
return allDone();
} else {
if (forward) {
if (comparator.compare(next.key.get(), rawLastKey) <= 0) {

if (comparator.compare(next.key.get(), rawLastKey) < 0) {
return next;
} else if (comparator.compare(next.key.get(), rawLastKey) == 0) {
return toInclusive ? next : allDone();
} else {
return allDone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,15 @@ public LRUCacheEntry delete(final String namespace, final Bytes key) {
}

public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
return range(namespace, from, to, true);
}

public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to, final boolean toInclusive) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics));
}
return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to, toInclusive), cache);
}

public MemoryLRUCacheBytesIterator reverseRange(final String namespace, final Bytes from, final Bytes to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ private StateStoreMetrics() {}
private static final String RANGE_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION;
private static final String RANGE_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + RANGE_DESCRIPTION;

private static final String PREFIX_SCAN = "prefix-scan";
private static final String PREFIX_SCAN_DESCRIPTION = "calls to prefix-scan";
private static final String PREFIX_SCAN_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String PREFIX_SCAN_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION;
private static final String PREFIX_SCAN_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + PREFIX_SCAN_DESCRIPTION;

private static final String FLUSH = "flush";
private static final String FLUSH_DESCRIPTION = "calls to flush";
private static final String FLUSH_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + FLUSH_DESCRIPTION;
Expand Down Expand Up @@ -307,6 +314,33 @@ public static Sensor rangeSensor(final String threadId,
);
}

public static Sensor prefixScanSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {

final String latencyMetricName = PREFIX_SCAN + LATENCY_SUFFIX;
final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskId, storeType, storeName);

final Sensor sensor = streamsMetrics.storeLevelSensor(taskId, storeName, PREFIX_SCAN, RecordingLevel.DEBUG);
addInvocationRateToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
tagMap,
PREFIX_SCAN,
PREFIX_SCAN_RATE_DESCRIPTION
);
addAvgAndMaxToSensor(
sensor,
STATE_STORE_LEVEL_GROUP,
tagMap,
latencyMetricName,
PREFIX_SCAN_AVG_LATENCY_DESCRIPTION,
PREFIX_SCAN_MAX_LATENCY_DESCRIPTION
);
return sensor;
}

public static Sensor flushSensor(final String threadId,
final String taskId,
final String storeType,
Expand Down
Loading

0 comments on commit 17be91a

Please sign in to comment.