From 50437549bc3f92812557248ec35f5a905e504543 Mon Sep 17 00:00:00 2001 From: Han Zhang <41025882+hanwavefront@users.noreply.github.com> Date: Wed, 12 Feb 2020 17:21:40 -0800 Subject: [PATCH] =?UTF-8?q?Update=20HashMaps=20to=20ConcurrentHashMaps=20t?= =?UTF-8?q?o=20resolve=20ConcurrentModificati=E2=80=A6=20(#496)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/wavefront/agent/PushAgent.java | 7 +++++-- .../handlers/ReportableEntityHandlerFactoryImpl.java | 6 +++--- .../agent/handlers/SenderTaskFactoryImpl.java | 12 ++++++------ .../agent/queueing/QueueingFactoryImpl.java | 9 +++++---- .../agent/queueing/TaskQueueFactoryImpl.java | 4 ++-- .../wavefront/common/SharedRateLimitingLogger.java | 4 ++-- 6 files changed, 23 insertions(+), 19 deletions(-) diff --git a/proxy/src/main/java/com/wavefront/agent/PushAgent.java b/proxy/src/main/java/com/wavefront/agent/PushAgent.java index 13d7a1e85..c955cec39 100644 --- a/proxy/src/main/java/com/wavefront/agent/PushAgent.java +++ b/proxy/src/main/java/com/wavefront/agent/PushAgent.java @@ -114,6 +114,7 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -678,7 +679,8 @@ protected void startDeltaCounterListener(String strPort, if (this.deltaCounterHandlerFactory == null) { this.deltaCounterHandlerFactory = new ReportableEntityHandlerFactory() { - private final Map> handlers = new HashMap<>(); + private final Map> handlers = + new ConcurrentHashMap<>(); @Override public ReportableEntityHandler getHandler(HandlerKey handlerKey) { @@ -935,7 +937,8 @@ protected void startHistogramListeners(List ports, }); ReportableEntityHandlerFactory histogramHandlerFactory = new ReportableEntityHandlerFactory() { - private final Map> handlers = new HashMap<>(); + private final Map> handlers = + new ConcurrentHashMap<>(); @SuppressWarnings("unchecked") @Override public ReportableEntityHandler getHandler(HandlerKey handlerKey) { diff --git a/proxy/src/main/java/com/wavefront/agent/handlers/ReportableEntityHandlerFactoryImpl.java b/proxy/src/main/java/com/wavefront/agent/handlers/ReportableEntityHandlerFactoryImpl.java index 19fc6ac62..93a4e2313 100644 --- a/proxy/src/main/java/com/wavefront/agent/handlers/ReportableEntityHandlerFactoryImpl.java +++ b/proxy/src/main/java/com/wavefront/agent/handlers/ReportableEntityHandlerFactoryImpl.java @@ -6,8 +6,8 @@ import org.apache.commons.lang.math.NumberUtils; import wavefront.report.Histogram; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.logging.Logger; @@ -43,7 +43,7 @@ public class ReportableEntityHandlerFactoryImpl implements ReportableEntityHandl getSystemPropertyAsDouble("wavefront.proxy.logevents.sample-rate"), false, logger::info); protected final Map>> handlers = - new HashMap<>(); + new ConcurrentHashMap<>(); private final SenderTaskFactory senderTaskFactory; private final int blockedItemsPerBatch; @@ -80,7 +80,7 @@ public ReportableEntityHandlerFactoryImpl( @Override public ReportableEntityHandler getHandler(HandlerKey handlerKey) { return (ReportableEntityHandler) handlers.computeIfAbsent(handlerKey.getHandle(), - h -> new HashMap<>()).computeIfAbsent(handlerKey.getEntityType(), k -> { + h -> new ConcurrentHashMap<>()).computeIfAbsent(handlerKey.getEntityType(), k -> { switch (handlerKey.getEntityType()) { case POINT: return new ReportPointHandlerImpl(handlerKey, blockedItemsPerBatch, diff --git a/proxy/src/main/java/com/wavefront/agent/handlers/SenderTaskFactoryImpl.java b/proxy/src/main/java/com/wavefront/agent/handlers/SenderTaskFactoryImpl.java index 03e62af7a..604c0d7a2 100644 --- a/proxy/src/main/java/com/wavefront/agent/handlers/SenderTaskFactoryImpl.java +++ b/proxy/src/main/java/com/wavefront/agent/handlers/SenderTaskFactoryImpl.java @@ -17,11 +17,11 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -42,15 +42,15 @@ */ public class SenderTaskFactoryImpl implements SenderTaskFactory { - private final Map> entityTypes = new HashMap<>(); - private final Map executors = new HashMap<>(); - private final Map>> managedTasks = new HashMap<>(); - private final Map managedServices = new HashMap<>(); + private final Map> entityTypes = new ConcurrentHashMap<>(); + private final Map executors = new ConcurrentHashMap<>(); + private final Map>> managedTasks = new ConcurrentHashMap<>(); + private final Map managedServices = new ConcurrentHashMap<>(); /** * Keep track of all {@link TaskSizeEstimator} instances to calculate global buffer fill rate. */ - private final Map taskSizeEstimators = new HashMap<>(); + private final Map taskSizeEstimators = new ConcurrentHashMap<>(); private final APIContainer apiContainer; private final UUID proxyId; diff --git a/proxy/src/main/java/com/wavefront/agent/queueing/QueueingFactoryImpl.java b/proxy/src/main/java/com/wavefront/agent/queueing/QueueingFactoryImpl.java index efc23875e..6c2a0c679 100644 --- a/proxy/src/main/java/com/wavefront/agent/queueing/QueueingFactoryImpl.java +++ b/proxy/src/main/java/com/wavefront/agent/queueing/QueueingFactoryImpl.java @@ -13,11 +13,11 @@ import com.wavefront.data.ReportableEntityType; import javax.annotation.Nonnull; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; @@ -30,9 +30,10 @@ */ public class QueueingFactoryImpl implements QueueingFactory { - private final Map executors = new HashMap<>(); - private final Map>> queueProcessors = new HashMap<>(); - private final Map> queueControllers = new HashMap<>(); + private final Map executors = new ConcurrentHashMap<>(); + private final Map>> queueProcessors = + new ConcurrentHashMap<>(); + private final Map> queueControllers = new ConcurrentHashMap<>(); private final TaskQueueFactory taskQueueFactory; private final APIContainer apiContainer; private final UUID proxyId; diff --git a/proxy/src/main/java/com/wavefront/agent/queueing/TaskQueueFactoryImpl.java b/proxy/src/main/java/com/wavefront/agent/queueing/TaskQueueFactoryImpl.java index 4582cab69..7d62288b0 100644 --- a/proxy/src/main/java/com/wavefront/agent/queueing/TaskQueueFactoryImpl.java +++ b/proxy/src/main/java/com/wavefront/agent/queueing/TaskQueueFactoryImpl.java @@ -13,10 +13,10 @@ import java.io.File; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; /** @@ -27,7 +27,7 @@ public class TaskQueueFactoryImpl implements TaskQueueFactory { private static final Logger logger = Logger.getLogger(TaskQueueFactoryImpl.class.getCanonicalName()); - private final Map>> taskQueues = new HashMap<>(); + private final Map>> taskQueues = new ConcurrentHashMap<>(); private final String bufferFile; private final boolean purgeBuffer; diff --git a/proxy/src/main/java/com/wavefront/common/SharedRateLimitingLogger.java b/proxy/src/main/java/com/wavefront/common/SharedRateLimitingLogger.java index 797bf22f1..ee41bb4fe 100644 --- a/proxy/src/main/java/com/wavefront/common/SharedRateLimitingLogger.java +++ b/proxy/src/main/java/com/wavefront/common/SharedRateLimitingLogger.java @@ -2,8 +2,8 @@ import com.google.common.util.concurrent.RateLimiter; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.LogRecord; @@ -15,7 +15,7 @@ */ @SuppressWarnings("UnstableApiUsage") public class SharedRateLimitingLogger extends DelegatingLogger { - private static final Map SHARED_CACHE = new HashMap<>(); + private static final Map SHARED_CACHE = new ConcurrentHashMap<>(); private final RateLimiter rateLimiter;