diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index f7f26aadfd52a..b40a257d19043 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -144,7 +144,6 @@ public class StreamsConfig extends AbstractConfig { private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L; public static final int DUMMY_THREAD_INDEX = 1; - public static final long MAX_TASK_IDLE_MS_DISABLED = -1; /** * Prefix used to provide default topic configs to be applied when creating internal topics. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 86b76e3d84ed7..482a2c5d9d175 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -242,8 +242,7 @@ private StreamTask createActiveTask(final TaskId taskId, time, stateManager, recordCollector, - context, - logContext + context ); log.trace("Created task {} with assigned partitions {}", taskId, inputPartitions); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 46b3429f226e6..559e3b1838486 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -17,21 +17,16 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.streams.StreamsConfig; -import org.slf4j.Logger; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; +import java.util.Iterator; +import java.util.HashSet; import java.util.function.Function; /** @@ -58,18 +53,14 @@ */ public class PartitionGroup { - private final Logger logger; private final Map partitionQueues; - private final Sensor enforcedProcessingSensor; - private final long maxTaskIdleMs; private final Sensor recordLatenessSensor; private final PriorityQueue nonEmptyQueuesByTime; private long streamTime; private int totalBuffered; private boolean allBuffered; - private final Map fetchedLags = new HashMap<>(); - private final Map idlePartitionDeadlines = new HashMap<>(); + static class RecordInfo { RecordQueue queue; @@ -87,144 +78,15 @@ RecordQueue queue() { } } - PartitionGroup(final LogContext logContext, - final Map partitionQueues, - final Sensor recordLatenessSensor, - final Sensor enforcedProcessingSensor, - final long maxTaskIdleMs) { - this.logger = logContext.logger(PartitionGroup.class); + PartitionGroup(final Map partitionQueues, final Sensor recordLatenessSensor) { nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; - this.enforcedProcessingSensor = enforcedProcessingSensor; - this.maxTaskIdleMs = maxTaskIdleMs; this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; allBuffered = false; streamTime = RecordQueue.UNKNOWN; } - public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) { - final Long lag = metadata.lag(); - if (lag != null) { - logger.trace("added fetched lag {}: {}", partition, lag); - fetchedLags.put(partition, lag); - } - } - - public boolean readyToProcess(final long wallClockTime) { - if (logger.isTraceEnabled()) { - for (final Map.Entry entry : partitionQueues.entrySet()) { - logger.trace( - "buffered/lag {}: {}/{}", - entry.getKey(), - entry.getValue().size(), - fetchedLags.get(entry.getKey()) - ); - } - } - // Log-level strategy: - // TRACE for messages that don't wait for fetches - // TRACE when we waited for a fetch and decided to wait some more, as configured - // TRACE when we are ready for processing and didn't have to enforce processing - // INFO when we enforce processing, since this has to wait for fetches AND may result in disorder - - if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) { - if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) { - final Set bufferedPartitions = new HashSet<>(); - final Set emptyPartitions = new HashSet<>(); - for (final Map.Entry entry : partitionQueues.entrySet()) { - if (entry.getValue().isEmpty()) { - emptyPartitions.add(entry.getKey()); - } else { - bufferedPartitions.add(entry.getKey()); - } - } - logger.trace("Ready for processing because max.task.idle.ms is disabled." + - "\n\tThere may be out-of-order processing for this task as a result." + - "\n\tBuffered partitions: {}" + - "\n\tNon-buffered partitions: {}", - bufferedPartitions, - emptyPartitions); - } - return true; - } - - final Set queued = new HashSet<>(); - Map enforced = null; - - for (final Map.Entry entry : partitionQueues.entrySet()) { - final TopicPartition partition = entry.getKey(); - final RecordQueue queue = entry.getValue(); - - final Long nullableFetchedLag = fetchedLags.get(partition); - - if (!queue.isEmpty()) { - // this partition is ready for processing - idlePartitionDeadlines.remove(partition); - queued.add(partition); - } else if (nullableFetchedLag == null) { - // must wait to fetch metadata for the partition - idlePartitionDeadlines.remove(partition); - logger.trace("Waiting to fetch data for {}", partition); - return false; - } else if (nullableFetchedLag > 0L) { - // must wait to poll the data we know to be on the broker - idlePartitionDeadlines.remove(partition); - logger.trace( - "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.", - partition, - nullableFetchedLag - ); - return false; - } else { - // p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up. - // One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs - // instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero - // lag instead of when we happen to run this method, but realistically it's probably a small difference - // and using wall clock time seems more intuitive for users, - // since the log message will be as of wallClockTime. - idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs); - final long deadline = idlePartitionDeadlines.get(partition); - if (wallClockTime < deadline) { - logger.trace( - "Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).", - partition, - wallClockTime, - maxTaskIdleMs, - deadline - ); - return false; - } else { - // this partition is ready for processing due to the task idling deadline passing - if (enforced == null) { - enforced = new HashMap<>(); - } - enforced.put(partition, deadline); - } - } - } - if (enforced == null) { - logger.trace("All partitions were buffered locally, so this task is ready for processing."); - return true; - } else if (queued.isEmpty()) { - logger.trace("No partitions were buffered locally, so this task is not ready for processing."); - return false; - } else { - enforcedProcessingSensor.record(1.0d, wallClockTime); - logger.info("Continuing to process although some partition timestamps were not buffered locally." + - "\n\tThere may be out-of-order processing for this task as a result." + - "\n\tPartitions with local data: {}." + - "\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." + - "\n\tConfigured max.task.idle.ms: {}." + - "\n\tCurrent wall-clock time: {}.", - queued, - enforced, - maxTaskIdleMs, - wallClockTime); - return true; - } - } - // visible for testing long partitionTimestamp(final TopicPartition partition) { final RecordQueue queue = partitionQueues.get(partition); @@ -377,7 +239,7 @@ int numBuffered() { return totalBuffered; } - boolean allPartitionsBufferedLocally() { + boolean allPartitionsBuffered() { return allBuffered; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index d866954668618..4efb10eb566cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; @@ -287,11 +286,6 @@ public void addRecords(final TopicPartition partition, final Iterable e2eLatencySensors = new HashMap<>(); private final InternalProcessorContext processorContext; private final RecordQueueCreator recordQueueCreator; private StampedRecord record; + private long idleStartTimeMs; private boolean commitNeeded = false; private boolean commitRequested = false; private boolean hasPendingTxCommit = false; @@ -116,8 +118,7 @@ public StreamTask(final TaskId id, final Time time, final ProcessorStateManager stateMgr, final RecordCollector recordCollector, - final InternalProcessorContext processorContext, - final LogContext logContext) { + final InternalProcessorContext processorContext) { super( id, topology, @@ -141,6 +142,12 @@ public StreamTask(final TaskId id, this.streamsMetrics = streamsMetrics; closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics); final String taskId = id.toString(); + if (streamsMetrics.version() == Version.FROM_0100_TO_24) { + final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics); + enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent); + } else { + enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics); + } processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics); processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics); punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics); @@ -163,30 +170,17 @@ public StreamTask(final TaskId id, streamTimePunctuationQueue = new PunctuationQueue(); systemTimePunctuationQueue = new PunctuationQueue(); + maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); // initialize the consumed and committed offset cache consumedOffsets = new HashMap<>(); - recordQueueCreator = new RecordQueueCreator(this.logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler()); + recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler()); recordInfo = new PartitionGroup.RecordInfo(); - - final Sensor enforcedProcessingSensor; - if (streamsMetrics.version() == Version.FROM_0100_TO_24) { - final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics); - enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent); - } else { - enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics); - } - final long maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); - partitionGroup = new PartitionGroup( - logContext, - createPartitionQueues(), - TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics), - enforcedProcessingSensor, - maxTaskIdleMs - ); + partitionGroup = new PartitionGroup(createPartitionQueues(), + TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics)); stateMgr.registerGlobalStateStores(topology.globalStateStores()); } @@ -242,6 +236,7 @@ public void completeRestoration() { initializeMetadata(); initializeTopology(); processorContext.initialize(); + idleStartTimeMs = RecordQueue.UNKNOWN; transitionTo(State.RUNNING); @@ -632,12 +627,7 @@ record = null; /** * An active task is processable if its buffer contains data for all of its input - * source topic partitions, or if it is enforced to be processable. - * - * Note that this method is _NOT_ idempotent, because the internal bookkeeping - * consumes the partition metadata. For example, unit tests may have to invoke - * {@link #addFetchedMetadata(TopicPartition, ConsumerRecords.Metadata)} again - * invoking this method. + * source topic partitions, or if it is enforced to be processable */ public boolean isProcessable(final long wallClockTime) { if (state() == State.CLOSED) { @@ -655,7 +645,26 @@ public boolean isProcessable(final long wallClockTime) { return false; } - return partitionGroup.readyToProcess(wallClockTime); + if (partitionGroup.allPartitionsBuffered()) { + idleStartTimeMs = RecordQueue.UNKNOWN; + return true; + } else if (partitionGroup.numBuffered() > 0) { + if (idleStartTimeMs == RecordQueue.UNKNOWN) { + idleStartTimeMs = wallClockTime; + } + + if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) { + enforcedProcessingSensor.record(1.0d, wallClockTime); + return true; + } else { + return false; + } + } else { + // there's no data in any of the topics; we should reset the enforced + // processing timer + idleStartTimeMs = RecordQueue.UNKNOWN; + return false; + } } /** @@ -922,11 +931,6 @@ public void addRecords(final TopicPartition partition, final Iterable 0) { + log.debug("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, numRecords); + } pollSensor.record(pollLatency, now); - if (!records.isEmpty() || !records.metadata().isEmpty()) { + if (!records.isEmpty()) { pollRecordsSensor.record(numRecords, now); taskManager.addRecordsToTasks(records); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 8e3aa0b41e854..206a5efd6a7de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; @@ -158,11 +157,6 @@ enum TaskType { void addRecords(TopicPartition partition, Iterable> records); - /** - * Add to this task any metadata returned from the poll. - */ - void addFetchedMetadata(TopicPartition partition, ConsumerRecords.Metadata metadata); - default boolean process(final long wallClockTime) { return false; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 19e91e2be5372..3ca6876a36e85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -970,7 +970,7 @@ int commitAll() { * @param records Records, can be null */ void addRecordsToTasks(final ConsumerRecords records) { - for (final TopicPartition partition : union(HashSet::new, records.partitions(), records.metadata().keySet())) { + for (final TopicPartition partition : records.partitions()) { final Task activeTask = tasks.activeTasksForInputPartition(partition); if (activeTask == null) { @@ -980,7 +980,6 @@ void addRecordsToTasks(final ConsumerRecords records) { } activeTask.addRecords(partition, records.records(partition)); - activeTask.addFetchedMetadata(partition, records.metadata().get(partition)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 58b9c19cd4aec..e3999982cfcd5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -128,7 +128,6 @@ public void setUp() throws InterruptedException { properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - properties.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 0L); streamsConfiguration = StreamsTestUtils.getStreamsConfig( IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, new TestName()), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 09558d23854e8..d8793265f5f37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; @@ -30,39 +29,32 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.processor.TimestampExtractor; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; -import org.hamcrest.Matchers; import org.junit.Test; import java.util.Arrays; import java.util.List; -import java.util.UUID; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkSet; import static org.hamcrest.CoreMatchers.is; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class PartitionGroupTest { - - private final long maxTaskIdleMs = StreamsConfig.MAX_TASK_IDLE_MS_DISABLED; private final LogContext logContext = new LogContext("[test] "); private final Time time = new MockTime(); private final Serializer intSerializer = new IntegerSerializer(); @@ -80,7 +72,6 @@ public class PartitionGroupTest { private final byte[] recordKey = intSerializer.serialize(null, 1); private final Metrics metrics = new Metrics(); - private final Sensor enforcedProcessingSensor = metrics.sensor(UUID.randomUUID().toString()); private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap()); @@ -459,7 +450,7 @@ public void shouldUpdatePartitionQueuesShrink() { new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue)); group.addRawRecords(partition2, list2); assertEquals(list1.size() + list2.size(), group.numBuffered()); - assertTrue(group.allPartitionsBufferedLocally()); + assertTrue(group.allPartitionsBuffered()); group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()); // shrink list of queues @@ -468,7 +459,7 @@ public void shouldUpdatePartitionQueuesShrink() { return null; }); - assertTrue(group.allPartitionsBufferedLocally()); // because didn't add any new partitions + assertTrue(group.allPartitionsBuffered()); // because didn't add any new partitions assertEquals(list2.size(), group.numBuffered()); assertEquals(1, group.streamTime()); assertThrows(IllegalStateException.class, () -> group.partitionTimestamp(partition1)); @@ -479,11 +470,8 @@ public void shouldUpdatePartitionQueuesShrink() { @Test public void shouldUpdatePartitionQueuesExpand() { final PartitionGroup group = new PartitionGroup( - logContext, mkMap(mkEntry(partition1, queue1)), - getValueSensor(metrics, lastLatenessValue), - enforcedProcessingSensor, - maxTaskIdleMs + getValueSensor(metrics, lastLatenessValue) ); final List> list1 = Arrays.asList( new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), @@ -491,7 +479,7 @@ public void shouldUpdatePartitionQueuesExpand() { group.addRawRecords(partition1, list1); assertEquals(list1.size(), group.numBuffered()); - assertTrue(group.allPartitionsBufferedLocally()); + assertTrue(group.allPartitionsBuffered()); group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()); // expand list of queues @@ -500,7 +488,7 @@ public void shouldUpdatePartitionQueuesExpand() { return createQueue2(); }); - assertFalse(group.allPartitionsBufferedLocally()); // because added new partition + assertFalse(group.allPartitionsBuffered()); // because added new partition assertEquals(1, group.numBuffered()); assertEquals(1, group.streamTime()); assertThat(group.partitionTimestamp(partition1), equalTo(1L)); @@ -511,18 +499,15 @@ public void shouldUpdatePartitionQueuesExpand() { @Test public void shouldUpdatePartitionQueuesShrinkAndExpand() { final PartitionGroup group = new PartitionGroup( - logContext, mkMap(mkEntry(partition1, queue1)), - getValueSensor(metrics, lastLatenessValue), - enforcedProcessingSensor, - maxTaskIdleMs + getValueSensor(metrics, lastLatenessValue) ); final List> list1 = Arrays.asList( new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); group.addRawRecords(partition1, list1); assertEquals(list1.size(), group.numBuffered()); - assertTrue(group.allPartitionsBufferedLocally()); + assertTrue(group.allPartitionsBuffered()); group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()); // expand and shrink list of queues @@ -531,7 +516,7 @@ public void shouldUpdatePartitionQueuesShrinkAndExpand() { return createQueue2(); }); - assertFalse(group.allPartitionsBufferedLocally()); // because added new partition + assertFalse(group.allPartitionsBuffered()); // because added new partition assertEquals(0, group.numBuffered()); assertEquals(1, group.streamTime()); assertThrows(IllegalStateException.class, () -> group.partitionTimestamp(partition1)); @@ -539,232 +524,13 @@ public void shouldUpdatePartitionQueuesShrinkAndExpand() { assertThat(group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()), nullValue()); // all available records removed } - @Test - public void shouldNeverWaitIfIdlingIsDisabled() { - final PartitionGroup group = new PartitionGroup( - logContext, - mkMap( - mkEntry(partition1, queue1), - mkEntry(partition2, queue2) - ), - getValueSensor(metrics, lastLatenessValue), - enforcedProcessingSensor, - StreamsConfig.MAX_TASK_IDLE_MS_DISABLED - ); - - final List> list1 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); - group.addRawRecords(partition1, list1); - - assertThat(group.allPartitionsBufferedLocally(), is(false)); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); - assertThat(group.readyToProcess(0L), is(true)); - assertThat( - appender.getEvents(), - hasItem(Matchers.allOf( - Matchers.hasProperty("level", equalTo("TRACE")), - Matchers.hasProperty("message", equalTo( - "[test] Ready for processing because max.task.idle.ms is disabled.\n" + - "\tThere may be out-of-order processing for this task as a result.\n" + - "\tBuffered partitions: [topic-1]\n" + - "\tNon-buffered partitions: [topic-2]" - )) - )) - ); - } - } - - @Test - public void shouldBeReadyIfAllPartitionsAreBuffered() { - final PartitionGroup group = new PartitionGroup( - logContext, - mkMap( - mkEntry(partition1, queue1), - mkEntry(partition2, queue2) - ), - getValueSensor(metrics, lastLatenessValue), - enforcedProcessingSensor, - 0L - ); - - final List> list1 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); - group.addRawRecords(partition1, list1); - - final List> list2 = Arrays.asList( - new ConsumerRecord<>("topic", 2, 1L, recordKey, recordValue), - new ConsumerRecord<>("topic", 2, 5L, recordKey, recordValue)); - group.addRawRecords(partition2, list2); - - assertThat(group.allPartitionsBufferedLocally(), is(true)); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); - assertThat(group.readyToProcess(0L), is(true)); - assertThat( - appender.getEvents(), - hasItem(Matchers.allOf( - Matchers.hasProperty("level", equalTo("TRACE")), - Matchers.hasProperty("message", equalTo("[test] All partitions were buffered locally, so this task is ready for processing.")) - )) - ); - } - } - - @Test - public void shouldWaitForFetchesWhenMetadataIsIncomplete() { - final PartitionGroup group = new PartitionGroup( - logContext, - mkMap( - mkEntry(partition1, queue1), - mkEntry(partition2, queue2) - ), - getValueSensor(metrics, lastLatenessValue), - enforcedProcessingSensor, - 0L - ); - - final List> list1 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); - group.addRawRecords(partition1, list1); - - assertThat(group.allPartitionsBufferedLocally(), is(false)); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); - assertThat(group.readyToProcess(0L), is(false)); - assertThat( - appender.getEvents(), - hasItem(Matchers.allOf( - Matchers.hasProperty("level", equalTo("TRACE")), - Matchers.hasProperty("message", equalTo("[test] Waiting to fetch data for topic-2")) - )) - ); - } - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L)); - assertThat(group.readyToProcess(0L), is(true)); - } - - @Test - public void shouldWaitForPollWhenLagIsNonzero() { - final PartitionGroup group = new PartitionGroup( - logContext, - mkMap( - mkEntry(partition1, queue1), - mkEntry(partition2, queue2) - ), - getValueSensor(metrics, lastLatenessValue), - enforcedProcessingSensor, - 0L - ); - - final List> list1 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); - group.addRawRecords(partition1, list1); - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 1L)); - - assertThat(group.allPartitionsBufferedLocally(), is(false)); - - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); - assertThat(group.readyToProcess(0L), is(false)); - assertThat( - appender.getEvents(), - hasItem(Matchers.allOf( - Matchers.hasProperty("level", equalTo("TRACE")), - Matchers.hasProperty("message", equalTo("[test] Lag for topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some records.")) - )) - ); - } - } - - @Test - public void shouldIdleAsSpecifiedWhenLagIsZero() { - final PartitionGroup group = new PartitionGroup( - logContext, - mkMap( - mkEntry(partition1, queue1), - mkEntry(partition2, queue2) - ), - getValueSensor(metrics, lastLatenessValue), - enforcedProcessingSensor, - 1L - ); - - final List> list1 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); - group.addRawRecords(partition1, list1); - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L)); - - assertThat(group.allPartitionsBufferedLocally(), is(false)); - - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); - assertThat(group.readyToProcess(0L), is(false)); - assertThat( - appender.getEvents(), - hasItem(Matchers.allOf( - Matchers.hasProperty("level", equalTo("TRACE")), - Matchers.hasProperty("message", equalTo("[test] Lag for topic-2 is currently 0 and current time is 0. Waiting for new data to be produced for configured idle time 1 (deadline is 1).")) - )) - ); - } - - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L)); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); - assertThat(group.readyToProcess(1L), is(true)); - assertThat( - appender.getEvents(), - hasItem(Matchers.allOf( - Matchers.hasProperty("level", equalTo("INFO")), - Matchers.hasProperty("message", equalTo( - "[test] Continuing to process although some partition timestamps were not buffered locally.\n" + - "\tThere may be out-of-order processing for this task as a result.\n" + - "\tPartitions with local data: [topic-1].\n" + - "\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n" + - "\tConfigured max.task.idle.ms: 1.\n" + - "\tCurrent wall-clock time: 1." - )) - )) - ); - } - - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L)); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); - assertThat(group.readyToProcess(2L), is(true)); - assertThat( - appender.getEvents(), - hasItem(Matchers.allOf( - Matchers.hasProperty("level", equalTo("INFO")), - Matchers.hasProperty("message", equalTo( - "[test] Continuing to process although some partition timestamps were not buffered locally.\n" + - "\tThere may be out-of-order processing for this task as a result.\n" + - "\tPartitions with local data: [topic-1].\n" + - "\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n" + - "\tConfigured max.task.idle.ms: 1.\n" + - "\tCurrent wall-clock time: 2." - )) - )) - ); - } - } - private PartitionGroup getBasicGroup() { return new PartitionGroup( - logContext, mkMap( mkEntry(partition1, queue1), mkEntry(partition2, queue2) ), - getValueSensor(metrics, lastLatenessValue), - enforcedProcessingSensor, - maxTaskIdleMs + getValueSensor(metrics, lastLatenessValue) ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index ea3fbdd91e1cf..cc871a1541b4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -40,7 +39,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -126,7 +124,6 @@ public class StreamTaskTest { private static final File BASE_DIR = TestUtils.tempDirectory(); private static final long DEFAULT_TIMESTAMP = 1000; - private final LogContext logContext = new LogContext("[test] "); private final String topic1 = "topic1"; private final String topic2 = "topic2"; private final TopicPartition partition1 = new TopicPartition(topic1, 1); @@ -424,9 +421,6 @@ public void shouldProcessInOrder() { assertEquals(asList(101, 102, 103), source1.values); assertEquals(singletonList(201), source2.values); - // tell the task that it doesn't need to wait for more records on partition1 - task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 30L, 30L)); - assertTrue(task.process(0L)); assertEquals(1, task.numBuffered()); assertEquals(3, source1.numReceived); @@ -434,8 +428,6 @@ public void shouldProcessInOrder() { assertEquals(asList(101, 102, 103), source1.values); assertEquals(asList(201, 202), source2.values); - // tell the task that it doesn't need to wait for more records on partition1 - task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 30L, 30L)); assertTrue(task.process(0L)); assertEquals(0, task.numBuffered()); assertEquals(3, source1.numReceived); @@ -967,9 +959,6 @@ public void shouldPunctuateOnceStreamTimeAfterGap() { assertEquals(3, source2.numReceived); assertTrue(task.maybePunctuateStreamTime()); - // tell the task that it doesn't need to wait for new data on partition1 - task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 160L, 160L)); - // st: 161 assertTrue(task.process(0L)); assertEquals(0, task.numBuffered()); @@ -1093,18 +1082,16 @@ public void shouldCommitNextOffsetFromQueueIfAvailable() { @Test public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() { - task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig(), StreamsConfig.METRICS_LATEST); task.initializeIfNeeded(); task.completeRestoration(); consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1, 0L)); consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1, 1L)); consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1, 2L)); - consumer.updateEndOffsets(mkMap(mkEntry(partition2, 0L))); consumer.poll(Duration.ZERO); task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0L))); - task.addRecords(partition2, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition2, 0L))); task.process(0L); final Map offsetsAndMetadata = task.prepareCommit(); @@ -1176,6 +1163,107 @@ public void shouldBeProcessableIfAllPartitionsBuffered() { assertTrue(task.process(0L)); } + @Test + public void shouldBeProcessableIfWaitedForTooLong() { + // max idle time is 100ms + task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + final MetricName enforcedProcessMetric = metrics.metricName( + "enforced-processing-total", + "stream-task-metrics", + mkMap(mkEntry("thread-id", Thread.currentThread().getName()), mkEntry("task-id", taskId.toString())) + ); + + assertFalse(task.process(time.milliseconds())); + assertEquals(0.0, metrics.metric(enforcedProcessMetric).metricValue()); + + final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); + + task.addRecords(partition1, + asList( + new ConsumerRecord<>(topic1, 1, 0, bytes, bytes), + new ConsumerRecord<>(topic1, 1, 1, bytes, bytes), + new ConsumerRecord<>(topic1, 1, 2, bytes, bytes) + ) + ); + + assertFalse(task.process(time.milliseconds())); + + assertFalse(task.process(time.milliseconds() + 99L)); + + assertTrue(task.process(time.milliseconds() + 100L)); + assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue()); + + // once decided to enforce, continue doing that + assertTrue(task.process(time.milliseconds() + 101L)); + assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue()); + + task.addRecords(partition2, Collections.singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes))); + + assertTrue(task.process(time.milliseconds() + 130L)); + assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue()); + + // one resumed to normal processing, the timer should be reset + + assertFalse(task.process(time.milliseconds() + 150L)); + assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue()); + + assertFalse(task.process(time.milliseconds() + 249L)); + assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue()); + + assertTrue(task.process(time.milliseconds() + 250L)); + assertEquals(3.0, metrics.metric(enforcedProcessMetric).metricValue()); + } + + @Test + public void shouldNotBeProcessableIfNoDataAvailable() { + task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + final MetricName enforcedProcessMetric = metrics.metricName( + "enforced-processing-total", + "stream-task-metrics", + mkMap(mkEntry("thread-id", Thread.currentThread().getName()), mkEntry("task-id", taskId.toString())) + ); + + assertFalse(task.process(0L)); + assertEquals(0.0, metrics.metric(enforcedProcessMetric).metricValue()); + + final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); + + task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes))); + + assertFalse(task.process(time.milliseconds())); + + assertFalse(task.process(time.milliseconds() + 99L)); + + assertTrue(task.process(time.milliseconds() + 100L)); + assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue()); + + // once the buffer is drained and no new records coming, the timer should be reset + + assertFalse(task.process(time.milliseconds() + 110L)); + assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue()); + + // check that after time is reset, we only falls into enforced processing after the + // whole timeout has elapsed again + task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes))); + + assertFalse(task.process(time.milliseconds() + 150L)); + assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue()); + + assertFalse(task.process(time.milliseconds() + 249L)); + assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue()); + + assertTrue(task.process(time.milliseconds() + 250L)); + assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue()); + } + + + @Test public void shouldPunctuateSystemTimeWhenIntervalElapsed() { task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST); task.initializeIfNeeded(); @@ -1550,8 +1638,7 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() { time, stateManager, recordCollector, - context, - logContext); + context); task.initializeIfNeeded(); task.completeRestoration(); @@ -1560,7 +1647,6 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() { task.addRecords(repartition, singletonList(getConsumerRecordWithOffsetAsTimestamp(repartition, 10L))); assertTrue(task.process(0L)); - task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 5L, 5L)); assertTrue(task.process(0L)); task.prepareCommit(); @@ -1691,7 +1777,6 @@ public void shouldCheckpointWhileUpdateSnapshotWithTheConsumedOffsetsForSuspende task.initializeIfNeeded(); task.completeRestoration(); task.addRecords(partition1, singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 10))); - task.addRecords(partition2, singleton(getConsumerRecordWithOffsetAsTimestamp(partition2, 10))); task.process(100L); assertTrue(task.commitNeeded()); @@ -2021,12 +2106,11 @@ public void shouldThrowIfCleanClosingDirtyTask() { @Test public void shouldThrowIfRecyclingDirtyTask() { - task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST); + task = createSingleSourceStateless(createConfig(), StreamsConfig.METRICS_LATEST); task.initializeIfNeeded(); task.completeRestoration(); task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0))); - task.addRecords(partition2, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition2, 0))); task.process(0L); assertTrue(task.commitNeeded()); @@ -2117,8 +2201,8 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { time, stateManager, recordCollector, - context, - logContext) + context + ) ); assertThat(exception.getMessage(), equalTo("Invalid topology: " + @@ -2182,8 +2266,7 @@ private StreamTask createOptimizedStatefulTask(final StreamsConfig config, final time, stateManager, recordCollector, - context, - logContext + context ); } @@ -2223,8 +2306,7 @@ public Map committed(final Set clazz) { Logger.getLogger(clazz).setLevel(Level.DEBUG); } - public static void setClassLoggerToTrace(final Class clazz) { - Logger.getLogger(clazz).setLevel(Level.TRACE); - } - public static void unregister(final LogCaptureAppender logCaptureAppender) { Logger.getRootLogger().removeAppender(logCaptureAppender); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 5f85803812974..20cccf8b2ebf6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -447,7 +447,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, new MockTime(), stateManager, recordCollector, - context, logContext); + context); } private void mockThread(final boolean initialized) { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 5349d353ad253..8a9007651b8a8 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -526,21 +525,11 @@ private void setupTask(final StreamsConfig streamsConfig, mockWallClockTime, stateManager, recordCollector, - context, - logContext); + context + ); task.initializeIfNeeded(); task.completeRestoration(); task.processorContext().setRecordContext(null); - - // initialize the task metadata so that all topics have zero lag - for (final Map.Entry entry : startOffsets.entrySet()) { - final ConsumerRecords.Metadata metadata = new ConsumerRecords.Metadata( - mockWallClockTime.milliseconds(), - 0L, - 0L - ); - task.addFetchedMetadata(entry.getKey(), metadata); - } } else { task = null; } @@ -602,11 +591,10 @@ private void enqueueTaskRecord(final String inputTopic, final byte[] key, final byte[] value, final Headers headers) { - final long offset = offsetsByTopicOrPatternPartition.get(topicOrPatternPartition).incrementAndGet() - 1; task.addRecords(topicOrPatternPartition, Collections.singleton(new ConsumerRecord<>( inputTopic, topicOrPatternPartition.partition(), - offset, + offsetsByTopicOrPatternPartition.get(topicOrPatternPartition).incrementAndGet() - 1, timestamp, TimestampType.CREATE_TIME, (long) ConsumerRecord.NULL_CHECKSUM, @@ -616,12 +604,6 @@ private void enqueueTaskRecord(final String inputTopic, value, headers)) ); - final ConsumerRecords.Metadata metadata = new ConsumerRecords.Metadata( - mockWallClockTime.milliseconds(), - offset, - offset - ); - task.addFetchedMetadata(topicOrPatternPartition, metadata); } private void completeAllProcessableWork() {