Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10562: Properly invoke new StateStoreContext init #9388

Merged
merged 7 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
*/
public class WordCountProcessorTest {
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ticket needs to go in to 2.7.0 also, but I split it out for reviewability.

@Test
public void test() {
final MockProcessorContext context = new MockProcessorContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link Transformer} in the {@link WordCountTransformerDemo}.
*/
public class WordCountTransformerTest {
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
@Test
public void test() {
final MockProcessorContext context = new MockProcessorContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ public interface StateStore {
*
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
* @deprecated Since 2.7.0. Callers should invoke {@link this#init(StateStoreContext, StateStore)} instead.
* Implementers may choose to implement this method for backward compatibility or to throw an
* informative exception instead.
*/
@Deprecated
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the deprecation tag right now lets us be sure we encountered all places this method appears in the codebase.

void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
Expand All @@ -45,12 +46,19 @@ public void flush() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are going to be a lot of duplicated init methods. It's not great, but hopefully we can drop the old API before too long.

}

@Override
public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.List;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
Expand All @@ -31,19 +31,28 @@
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import java.util.List;

abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";

private AbstractReadWriteDecorator(final T inner) {
super(inner);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

/**
Expand Down Expand Up @@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context)
return (StreamsMetricsImpl) context.metrics();
}

/**
* Should be removed as part of KAFKA-10217
*/
public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) {
return (StreamsMetricsImpl) context.metrics();
}
Comment on lines +54 to +56
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a bunch of duplicated extractors here to help keep the implementation classes clean.


public static String changelogFor(final ProcessorContext context, final String storeName) {
return context instanceof InternalProcessorContext
? ((InternalProcessorContext) context).changelogFor(storeName)
: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
}

public static String changelogFor(final StateStoreContext context, final String storeName) {
return context instanceof InternalProcessorContext
? ((InternalProcessorContext) context).changelogFor(storeName)
: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
}

public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."
);
}
}
Comment on lines +70 to +78
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced a lot of casts with this checked-cast method, which also lets us get rid of a lot of similar cast-checking blocks, which were inconsistently applied.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public String name() {
return name;
}

@Deprecated
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a handful of these also, just passing the deprecation on to the callers.

@Override
public void init(final ProcessorContext context,
final StateStore root) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;

Expand All @@ -54,23 +56,39 @@ public class CachingKeyValueStore
super(underlying);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
initInternal(context);
if (!(context instanceof InternalProcessorContext)) {
throw new IllegalArgumentException(
"Caching requires internal features of KafkaStreams and must be disabled for unit tests."
);
}
initInternal(asInternalProcessorContext(context));
super.init(context, root);
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
initInternal(asInternalProcessorContext(context));
super.init(context, root);
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();
}

private void initInternal(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
private void initInternal(final InternalProcessorContext context) {
this.context = context;

this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name());
this.context.registerCacheFlushListener(cacheName, entries -> {
for (final ThreadCache.DirtyEntry entry : entries) {
putAndMaybeForward(entry, (InternalProcessorContext) context);
putAndMaybeForward(entry, context);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordQueue;
Expand All @@ -34,6 +35,7 @@
import java.util.NoSuchElementException;
import java.util.Objects;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;

Expand Down Expand Up @@ -61,9 +63,16 @@ class CachingSessionStore
this.maxObservedTimestamp = RecordQueue.UNKNOWN;
}

@Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
initInternal((InternalProcessorContext) context);
initInternal(asInternalProcessorContext(context));
super.init(context, root);
}

@Override
public void init(final StateStoreContext context, final StateStore root) {
initInternal(asInternalProcessorContext(context));
super.init(context, root);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
Expand All @@ -37,6 +38,7 @@
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;

Expand Down Expand Up @@ -67,14 +69,16 @@ class CachingWindowStore
this.maxObservedTimestamp = new AtomicLong(RecordQueue.UNKNOWN);
}

@Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
if (!(context instanceof InternalProcessorContext)) {
throw new IllegalArgumentException(
"Caching requires internal features of KafkaStreams and must be disabled for unit tests."
);
}
initInternal((InternalProcessorContext) context);
initInternal(asInternalProcessorContext(context));
super.init(context, root);
}

@Override
public void init(final StateStoreContext context, final StateStore root) {
initInternal(asInternalProcessorContext(context));
super.init(context, root);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.List;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

public class ChangeLoggingKeyValueBytesStore
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
implements KeyValueStore<Bytes, byte[]> {
Expand All @@ -36,12 +39,24 @@ public class ChangeLoggingKeyValueBytesStore
super(inner);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
super.init(context, root);
this.context = (InternalProcessorContext) context;
this.context = asInternalProcessorContext(context);
maybeSetEvictionListener();
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
super.init(context, root);
this.context = asInternalProcessorContext(context);
maybeSetEvictionListener();
}

private void maybeSetEvictionListener() {
// if the inner store is an LRU cache, add the eviction listener to log removed record
if (wrapped() instanceof MemoryLRUCache) {
((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

/**
* Simple wrapper around a {@link SessionStore} to support writing
* updates to a changelog
Expand All @@ -38,10 +41,17 @@ class ChangeLoggingSessionBytesStore
super(bytesStore);
}

@Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
super.init(context, root);
this.context = (InternalProcessorContext) context;
this.context = asInternalProcessorContext(context);
}

@Override
public void init(final StateStoreContext context, final StateStore root) {
super.init(context, root);
this.context = asInternalProcessorContext(context);
}

@Override
Expand Down
Loading