From d3dfe9b58b140207e55b45795d14dd7f698a1c1e Mon Sep 17 00:00:00 2001 From: Vadim Tsesko Date: Mon, 22 Apr 2019 11:38:52 +0300 Subject: [PATCH 1/2] Optimize TaskBatcher behavior in case of a datacenter failure. * Replace the global synchronized lock with ConcurrentMap facilities * Faster duplicate task check in the common case --- .../cluster/service/TaskBatcher.java | 145 ++++++++++++------ 1 file changed, 95 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java index 867d4191f800f..39cc324d04ffd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java @@ -29,10 +29,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.IdentityHashMap; -import java.util.LinkedHashSet; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -46,7 +49,7 @@ public abstract class TaskBatcher { private final Logger logger; private final PrioritizedEsThreadPoolExecutor threadExecutor; // package visible for tests - final Map> tasksPerBatchingKey = new HashMap<>(); + final ConcurrentMap> tasksPerBatchingKey = new ConcurrentHashMap<>(); public TaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) { this.logger = logger; @@ -61,25 +64,33 @@ public void submitTasks(List tasks, @Nullable TimeValue t assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) : "tasks submitted in a batch should share the same batching key: " + tasks; // convert to an identity map to check for dups based on task identity - final Map tasksIdentity = tasks.stream().collect(Collectors.toMap( - BatchedTask::getTask, + final Map toAdd = tasks.stream().collect(Collectors.toMap( + t -> new IdentityWrapper(t.getTask()), Function.identity(), (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); }, - IdentityHashMap::new)); - - synchronized (tasksPerBatchingKey) { - LinkedHashSet existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey, - k -> new LinkedHashSet<>(tasks.size())); - for (BatchedTask existing : existingTasks) { - // check that there won't be two tasks with the same identity for the same batching key - BatchedTask duplicateTask = tasksIdentity.get(existing.getTask()); - if (duplicateTask != null) { - throw new IllegalStateException("task [" + duplicateTask.describeTasks( - Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued"); + LinkedHashMap::new)); + + tasksPerBatchingKey.merge( + firstTask.batchingKey, + toAdd, + (oldValue, newValue) -> { + final Map merged = + new LinkedHashMap<>(oldValue.size() + newValue.size()); + merged.putAll(oldValue); + merged.putAll(newValue); + + if (merged.size() != oldValue.size() + newValue.size()) { + // Find the duplicate + oldValue.forEach((k, existing) -> { + final BatchedTask duplicateTask = newValue.get(k); + if (duplicateTask != null) { + throw new IllegalStateException("task [" + duplicateTask.describeTasks( + Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued"); + } + }); } - } - existingTasks.addAll(tasks); - } + return merged; + }); if (timeout != null) { threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout)); @@ -89,29 +100,42 @@ public void submitTasks(List tasks, @Nullable TimeValue t } private void onTimeoutInternal(List tasks, TimeValue timeout) { - final ArrayList toRemove = new ArrayList<>(); + final Set ids = new HashSet<>(tasks.size()); + final List toRemove = new ArrayList<>(tasks.size()); for (BatchedTask task : tasks) { - if (task.processed.getAndSet(true) == false) { + if (!task.processed.getAndSet(true)) { logger.debug("task [{}] timed out after [{}]", task.source, timeout); + ids.add(new IdentityWrapper(task.getTask())); toRemove.add(task); } } - if (toRemove.isEmpty() == false) { - BatchedTask firstTask = toRemove.get(0); - Object batchingKey = firstTask.batchingKey; - assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey) : - "tasks submitted in a batch should share the same batching key: " + tasks; - synchronized (tasksPerBatchingKey) { - LinkedHashSet existingTasks = tasksPerBatchingKey.get(batchingKey); - if (existingTasks != null) { - existingTasks.removeAll(toRemove); - if (existingTasks.isEmpty()) { - tasksPerBatchingKey.remove(batchingKey); - } - } - } - onTimeout(toRemove, timeout); + + if (toRemove.isEmpty()) { + return; } + + final BatchedTask firstTask = toRemove.get(0); + final Object batchingKey = firstTask.batchingKey; + assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey) : + "tasks submitted in a batch should share the same batching key: " + tasks; + tasksPerBatchingKey.computeIfPresent( + batchingKey, + (k, v) -> { + if (v.size() == ids.size() && ids.containsAll(v.keySet())) { + // Special case when all the tasks timed out + return null; + } else { + final Map merged = new LinkedHashMap<>(v.size()); + v.forEach((id, task) -> { + if (!ids.contains(id)) { + merged.put(id, task); + } + }); + return merged; + } + }); + + onTimeout(toRemove, timeout); } /** @@ -120,28 +144,26 @@ private void onTimeoutInternal(List tasks, TimeValue time */ protected abstract void onTimeout(List tasks, TimeValue timeout); - void runIfNotProcessed(BatchedTask updateTask) { + private void runIfNotProcessed(BatchedTask updateTask) { // if this task is already processed, it shouldn't execute other tasks with same batching key that arrived later, // to give other tasks with different batching key a chance to execute. - if (updateTask.processed.get() == false) { + if (!updateTask.processed.get()) { final List toExecute = new ArrayList<>(); final Map> processTasksBySource = new HashMap<>(); - synchronized (tasksPerBatchingKey) { - LinkedHashSet pending = tasksPerBatchingKey.remove(updateTask.batchingKey); - if (pending != null) { - for (BatchedTask task : pending) { - if (task.processed.getAndSet(true) == false) { - logger.trace("will process {}", task); - toExecute.add(task); - processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); - } else { - logger.trace("skipping {}, already processed", task); - } + final Map pending = tasksPerBatchingKey.remove(updateTask.batchingKey); + if (pending != null) { + for (BatchedTask task : pending.values()) { + if (!task.processed.getAndSet(true)) { + logger.trace("will process {}", task); + toExecute.add(task); + processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); + } else { + logger.trace("skipping {}, already processed", task); } } } - if (toExecute.isEmpty() == false) { + if (!toExecute.isEmpty()) { final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> { String tasks = updateTask.describeTasks(entry.getValue()); return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; @@ -204,4 +226,27 @@ public Object getTask() { return task; } } + + /** + * Uses wrapped {@link Object} identity for {@link #equals(Object)} and {@link #hashCode()}. + */ + private static final class IdentityWrapper { + private final Object object; + + private IdentityWrapper(final Object object) { + this.object = object; + } + + @Override + public boolean equals(final Object o) { + assert o instanceof IdentityWrapper; + final IdentityWrapper that = (IdentityWrapper) o; + return object == that.object; + } + + @Override + public int hashCode() { + return System.identityHashCode(object); + } + } } From 7b932beb6b53202e8b10c9da6dda0a0a0822331e Mon Sep 17 00:00:00 2001 From: Vadim Tsesko Date: Mon, 22 Apr 2019 17:03:49 +0300 Subject: [PATCH 2/2] Use explicit comparison to false instead of if-not --- .../org/elasticsearch/cluster/service/TaskBatcher.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java index 39cc324d04ffd..ba02dccbfeb10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java @@ -103,7 +103,7 @@ private void onTimeoutInternal(List tasks, TimeValue time final Set ids = new HashSet<>(tasks.size()); final List toRemove = new ArrayList<>(tasks.size()); for (BatchedTask task : tasks) { - if (!task.processed.getAndSet(true)) { + if (task.processed.getAndSet(true) == false) { logger.debug("task [{}] timed out after [{}]", task.source, timeout); ids.add(new IdentityWrapper(task.getTask())); toRemove.add(task); @@ -127,7 +127,7 @@ private void onTimeoutInternal(List tasks, TimeValue time } else { final Map merged = new LinkedHashMap<>(v.size()); v.forEach((id, task) -> { - if (!ids.contains(id)) { + if (ids.contains(id) == false) { merged.put(id, task); } }); @@ -147,13 +147,13 @@ private void onTimeoutInternal(List tasks, TimeValue time private void runIfNotProcessed(BatchedTask updateTask) { // if this task is already processed, it shouldn't execute other tasks with same batching key that arrived later, // to give other tasks with different batching key a chance to execute. - if (!updateTask.processed.get()) { + if (updateTask.processed.get() == false) { final List toExecute = new ArrayList<>(); final Map> processTasksBySource = new HashMap<>(); final Map pending = tasksPerBatchingKey.remove(updateTask.batchingKey); if (pending != null) { for (BatchedTask task : pending.values()) { - if (!task.processed.getAndSet(true)) { + if (task.processed.getAndSet(true) == false) { logger.trace("will process {}", task); toExecute.add(task); processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); @@ -163,7 +163,7 @@ private void runIfNotProcessed(BatchedTask updateTask) { } } - if (!toExecute.isEmpty()) { + if (toExecute.isEmpty() == false) { final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> { String tasks = updateTask.describeTasks(entry.getValue()); return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";