Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes for Adding a new Prefix iterator #242

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ versions += [
owaspDepCheckPlugin: "5.3.2.1",
powermock: "2.0.7",
reflections: "0.9.12",
rocksDB: "5.18.4",
rocksDB: "5.18.3",
scalaCollectionCompat: "2.1.6",
scalafmt: "1.5.1",
scalaJava8Compat : "0.9.1",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.apache.kafka.jmh.streams.processor.internals;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.*;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.openjdk.jmh.annotations.*;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;

@State(Scope.Benchmark)
@Warmup(iterations = 5)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class RocksDbPrefixStateStoreBenchmark {

private int counter;

private static final int DISTINCT_KEYS = 10_000;

private static final String KEY = "the_key_to_use";

private static final String VALUE = "the quick brown fox jumped over the lazy dog the olympics are about to start";

private final Bytes[] keys = new Bytes[DISTINCT_KEYS];

private final byte[][] values = new byte[DISTINCT_KEYS][];

private RocksDBStore prefixedStateStore;

@Setup(Level.Trial)
public void setUp() {
for (int i = 0; i < DISTINCT_KEYS; ++i) {
keys[i] = Bytes.wrap((KEY + i).getBytes(StandardCharsets.UTF_8));
values[i] = (VALUE + i).getBytes(StandardCharsets.UTF_8);
}

prefixedStateStore = (RocksDBStore)Stores.persistentKeyValueStore("prefix-state-store").get();

final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(), "prefix-test-metrics", StreamsConfig.METRICS_LATEST);
final ThreadCache cache = new ThreadCache(new LogContext("prefixTestCache "), 1_000_000_000, metrics);
final StreamsConfig config = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "prefix-test"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "prefix-test")
));

final TaskId id = new TaskId(0, 0);
final ProcessorStateManager stateMgr;
stateMgr = new ProcessorStateManager(
id,
Task.TaskType.ACTIVE,
false,
new LogContext("jmh"),
new StateDirectory(config, Time.SYSTEM, true),
null,
Collections.emptyMap(),
Collections.emptyList()
);

final ProcessorContextImpl context = new ProcessorContextImpl(
id,
null,
config,
null,
stateMgr,
metrics,
cache
);

context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
prefixedStateStore.init(context, prefixedStateStore);

}

@Benchmark
public KeyValueIterator<Bytes, byte[]> testCachePerformance() {
counter++;
final int index = counter % DISTINCT_KEYS;
// Put multiple keys with the same prefix patterns. This way, the prefix seek use case would be catered.
final Bytes key = keys[index];
prefixedStateStore.put(key, values[index]);
int numIterations = 10;
int nextIndex = index + 1;
while (numIterations >= 0) {
Bytes newKey = keys[nextIndex % DISTINCT_KEYS];
prefixedStateStore.put(newKey, values[nextIndex % DISTINCT_KEYS]);
numIterations--;
nextIndex++;
}

return prefixedStateStore.prefixScan(Bytes.wrap(KEY.getBytes()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals;

import java.util.List;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -121,6 +123,11 @@ public void putAll(final List<KeyValue<K, V>> entries) {
public V delete(final K key) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {
return null;
}
}

static class TimestampedKeyValueStoreReadOnlyDecorator<K, V>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals;

import java.util.List;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -115,6 +117,11 @@ public void putAll(final List<KeyValue<K, V>> entries) {
public V delete(final K key) {
return wrapped().delete(key);
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {
return null;
}
}

static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* @throws NullPointerException If {@code null} is used for key.
*/
V delete(K key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.InvalidStateStoreException;

/**
Expand Down Expand Up @@ -74,4 +75,16 @@ public interface ReadOnlyKeyValueStore<K, V> {
* @throws InvalidStateStoreException if the store is not initialized
*/
long approximateNumEntries();

/**
* Get an iterator over keys which have the specified prefix. The type of the prefix can be different from that of
* the key. That's why, callers should also pass a serializer for the prefix to convert the prefix into the
* format in which the keys are stored underneath in the stores
* @param prefix The prefix.
* @param prefixKeySerializer Serializer for the Prefix key type
* @param <PS> Prefix Serializer type
* @param <P> Prefix Type.
* @return The iterator for keys having the specified prefix.
*/
<PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -190,6 +191,11 @@ public byte[] delete(final Bytes key) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P prefix, PS prefixKeySerializer) {
return null;
}

private byte[] deleteInternal(final Bytes key) {
final byte[] v = getInternal(key);
putInternal(key, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -89,6 +90,11 @@ public byte[] delete(final Bytes key) {
return oldValue;
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P prefix, PS prefixKeySerializer) {
return wrapped().prefixScan(prefix, prefixKeySerializer);
}

@Override
public byte[] get(final Bytes key) {
return wrapped().get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreType;
Expand Down Expand Up @@ -111,6 +112,11 @@ public long approximateNumEntries() {
return total;
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {
throw new UnsupportedOperationException();
}


}

Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@
*/
package org.apache.kafka.streams.state.internals;

import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.*;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -108,6 +105,19 @@ public synchronized byte[] delete(final Bytes key) {
return oldValue;
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P prefix, PS prefixKeySerializer) {
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");

Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
Bytes to = Bytes.increment(from);

return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet()));
}

@Override
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -76,6 +77,11 @@ public byte[] delete(final Bytes key) {
return convertToTimestampedFormat(store.delete(key));
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P prefix, PS prefixKeySerializer) {
return null;
}

@Override
public String name() {
return store.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -134,6 +135,11 @@ public synchronized byte[] delete(final Bytes key) {
return this.map.remove(key);
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P prefix, PS prefixKeySerializer) {
return null;
}

/**
* @throws UnsupportedOperationException at every invocation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -173,6 +174,15 @@ public V delete(final K key) {
}
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {

return new MeteredKeyValueIterator(
wrapped().prefixScan(prefix, prefixKeySerializer),
rangeSensor
);
}

@Override
public KeyValueIterator<K, V> range(final K from,
final K to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
Expand Down Expand Up @@ -49,4 +50,9 @@ public KeyValueIterator<K, V> all() {
public long approximateNumEntries() {
return inner.approximateNumEntries();
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {
throw new UnsupportedOperationException();
}
}
Loading