Skip to content

Commit

Permalink
Revert "KAFKA-10867: Improved task idling (#9840)"
Browse files Browse the repository at this point in the history
This reverts commit 4d28391.

Closes #10119

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
John Roesler committed Feb 12, 2021
1 parent f4e475c commit 91fa677
Show file tree
Hide file tree
Showing 15 changed files with 177 additions and 517 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public class StreamsConfig extends AbstractConfig {
private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;

public static final int DUMMY_THREAD_INDEX = 1;
public static final long MAX_TASK_IDLE_MS_DISABLED = -1;

/**
* Prefix used to provide default topic configs to be applied when creating internal topics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,7 @@ private StreamTask createActiveTask(final TaskId taskId,
time,
stateManager,
recordCollector,
context,
logContext
context
);

log.trace("Created task {} with assigned partitions {}", taskId, inputPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.Iterator;
import java.util.HashSet;
import java.util.function.Function;

/**
Expand All @@ -58,18 +53,14 @@
*/
public class PartitionGroup {

private final Logger logger;
private final Map<TopicPartition, RecordQueue> partitionQueues;
private final Sensor enforcedProcessingSensor;
private final long maxTaskIdleMs;
private final Sensor recordLatenessSensor;
private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;

private long streamTime;
private int totalBuffered;
private boolean allBuffered;
private final Map<TopicPartition, Long> fetchedLags = new HashMap<>();
private final Map<TopicPartition, Long> idlePartitionDeadlines = new HashMap<>();


static class RecordInfo {
RecordQueue queue;
Expand All @@ -87,144 +78,15 @@ RecordQueue queue() {
}
}

PartitionGroup(final LogContext logContext,
final Map<TopicPartition, RecordQueue> partitionQueues,
final Sensor recordLatenessSensor,
final Sensor enforcedProcessingSensor,
final long maxTaskIdleMs) {
this.logger = logContext.logger(PartitionGroup.class);
PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) {
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
this.partitionQueues = partitionQueues;
this.enforcedProcessingSensor = enforcedProcessingSensor;
this.maxTaskIdleMs = maxTaskIdleMs;
this.recordLatenessSensor = recordLatenessSensor;
totalBuffered = 0;
allBuffered = false;
streamTime = RecordQueue.UNKNOWN;
}

public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
final Long lag = metadata.lag();
if (lag != null) {
logger.trace("added fetched lag {}: {}", partition, lag);
fetchedLags.put(partition, lag);
}
}

public boolean readyToProcess(final long wallClockTime) {
if (logger.isTraceEnabled()) {
for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
logger.trace(
"buffered/lag {}: {}/{}",
entry.getKey(),
entry.getValue().size(),
fetchedLags.get(entry.getKey())
);
}
}
// Log-level strategy:
// TRACE for messages that don't wait for fetches
// TRACE when we waited for a fetch and decided to wait some more, as configured
// TRACE when we are ready for processing and didn't have to enforce processing
// INFO when we enforce processing, since this has to wait for fetches AND may result in disorder

if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
final Set<TopicPartition> bufferedPartitions = new HashSet<>();
final Set<TopicPartition> emptyPartitions = new HashSet<>();
for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
if (entry.getValue().isEmpty()) {
emptyPartitions.add(entry.getKey());
} else {
bufferedPartitions.add(entry.getKey());
}
}
logger.trace("Ready for processing because max.task.idle.ms is disabled." +
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tBuffered partitions: {}" +
"\n\tNon-buffered partitions: {}",
bufferedPartitions,
emptyPartitions);
}
return true;
}

final Set<TopicPartition> queued = new HashSet<>();
Map<TopicPartition, Long> enforced = null;

for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
final TopicPartition partition = entry.getKey();
final RecordQueue queue = entry.getValue();

final Long nullableFetchedLag = fetchedLags.get(partition);

if (!queue.isEmpty()) {
// this partition is ready for processing
idlePartitionDeadlines.remove(partition);
queued.add(partition);
} else if (nullableFetchedLag == null) {
// must wait to fetch metadata for the partition
idlePartitionDeadlines.remove(partition);
logger.trace("Waiting to fetch data for {}", partition);
return false;
} else if (nullableFetchedLag > 0L) {
// must wait to poll the data we know to be on the broker
idlePartitionDeadlines.remove(partition);
logger.trace(
"Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
partition,
nullableFetchedLag
);
return false;
} else {
// p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up.
// One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs
// instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero
// lag instead of when we happen to run this method, but realistically it's probably a small difference
// and using wall clock time seems more intuitive for users,
// since the log message will be as of wallClockTime.
idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs);
final long deadline = idlePartitionDeadlines.get(partition);
if (wallClockTime < deadline) {
logger.trace(
"Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
partition,
wallClockTime,
maxTaskIdleMs,
deadline
);
return false;
} else {
// this partition is ready for processing due to the task idling deadline passing
if (enforced == null) {
enforced = new HashMap<>();
}
enforced.put(partition, deadline);
}
}
}
if (enforced == null) {
logger.trace("All partitions were buffered locally, so this task is ready for processing.");
return true;
} else if (queued.isEmpty()) {
logger.trace("No partitions were buffered locally, so this task is not ready for processing.");
return false;
} else {
enforcedProcessingSensor.record(1.0d, wallClockTime);
logger.info("Continuing to process although some partition timestamps were not buffered locally." +
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tPartitions with local data: {}." +
"\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." +
"\n\tConfigured max.task.idle.ms: {}." +
"\n\tCurrent wall-clock time: {}.",
queued,
enforced,
maxTaskIdleMs,
wallClockTime);
return true;
}
}

// visible for testing
long partitionTimestamp(final TopicPartition partition) {
final RecordQueue queue = partitionQueues.get(partition);
Expand Down Expand Up @@ -377,7 +239,7 @@ int numBuffered() {
return totalBuffered;
}

boolean allPartitionsBufferedLocally() {
boolean allPartitionsBuffered() {
return allBuffered;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
Expand Down Expand Up @@ -287,11 +286,6 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe
throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
}

@Override
public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
throw new IllegalStateException("Attempted to update metadata for standby task " + id());
}

InternalProcessorContext processorContext() {
return processorContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -80,6 +79,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// leaked into this class, which is to checkpoint after committing if EOS is not enabled.
private final boolean eosEnabled;

private final long maxTaskIdleMs;
private final int maxBufferedSize;
private final PartitionGroup partitionGroup;
private final RecordCollector recordCollector;
Expand All @@ -96,11 +96,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final Sensor processLatencySensor;
private final Sensor punctuateLatencySensor;
private final Sensor bufferedRecordsSensor;
private final Sensor enforcedProcessingSensor;
private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
private final InternalProcessorContext processorContext;
private final RecordQueueCreator recordQueueCreator;

private StampedRecord record;
private long idleStartTimeMs;
private boolean commitNeeded = false;
private boolean commitRequested = false;
private boolean hasPendingTxCommit = false;
Expand All @@ -116,8 +118,7 @@ public StreamTask(final TaskId id,
final Time time,
final ProcessorStateManager stateMgr,
final RecordCollector recordCollector,
final InternalProcessorContext processorContext,
final LogContext logContext) {
final InternalProcessorContext processorContext) {
super(
id,
topology,
Expand All @@ -141,6 +142,12 @@ public StreamTask(final TaskId id,
this.streamsMetrics = streamsMetrics;
closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
final String taskId = id.toString();
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
} else {
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
}
processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
Expand All @@ -163,30 +170,17 @@ public StreamTask(final TaskId id,

streamTimePunctuationQueue = new PunctuationQueue();
systemTimePunctuationQueue = new PunctuationQueue();
maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);

// initialize the consumed and committed offset cache
consumedOffsets = new HashMap<>();

recordQueueCreator = new RecordQueueCreator(this.logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());

recordInfo = new PartitionGroup.RecordInfo();

final Sensor enforcedProcessingSensor;
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
} else {
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
}
final long maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
partitionGroup = new PartitionGroup(
logContext,
createPartitionQueues(),
TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
enforcedProcessingSensor,
maxTaskIdleMs
);
partitionGroup = new PartitionGroup(createPartitionQueues(),
TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics));

stateMgr.registerGlobalStateStores(topology.globalStateStores());
}
Expand Down Expand Up @@ -242,6 +236,7 @@ public void completeRestoration() {
initializeMetadata();
initializeTopology();
processorContext.initialize();
idleStartTimeMs = RecordQueue.UNKNOWN;

transitionTo(State.RUNNING);

Expand Down Expand Up @@ -632,12 +627,7 @@ record = null;

/**
* An active task is processable if its buffer contains data for all of its input
* source topic partitions, or if it is enforced to be processable.
*
* Note that this method is _NOT_ idempotent, because the internal bookkeeping
* consumes the partition metadata. For example, unit tests may have to invoke
* {@link #addFetchedMetadata(TopicPartition, ConsumerRecords.Metadata)} again
* invoking this method.
* source topic partitions, or if it is enforced to be processable
*/
public boolean isProcessable(final long wallClockTime) {
if (state() == State.CLOSED) {
Expand All @@ -655,7 +645,26 @@ public boolean isProcessable(final long wallClockTime) {
return false;
}

return partitionGroup.readyToProcess(wallClockTime);
if (partitionGroup.allPartitionsBuffered()) {
idleStartTimeMs = RecordQueue.UNKNOWN;
return true;
} else if (partitionGroup.numBuffered() > 0) {
if (idleStartTimeMs == RecordQueue.UNKNOWN) {
idleStartTimeMs = wallClockTime;
}

if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
enforcedProcessingSensor.record(1.0d, wallClockTime);
return true;
} else {
return false;
}
} else {
// there's no data in any of the topics; we should reset the enforced
// processing timer
idleStartTimeMs = RecordQueue.UNKNOWN;
return false;
}
}

/**
Expand Down Expand Up @@ -922,11 +931,6 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe
}
}

@Override
public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
partitionGroup.addFetchedMetadata(partition, metadata);
}

/**
* Schedules a punctuation for the processor
*
Expand Down
Loading

0 comments on commit 91fa677

Please sign in to comment.