diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java index 207b556dd..6e19a1060 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java @@ -48,6 +48,8 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -93,6 +95,8 @@ public final class AtlasRegistry extends AbstractRegistry implements AutoCloseab private long lastFlushTimestamp = -1L; private final ConcurrentHashMap atlasMeasurements = new ConcurrentHashMap<>(); + private final ConcurrentHashMap publishTaskLocks = new ConcurrentHashMap<>(); + /** Create a new instance. */ @Inject public AtlasRegistry(Clock clock, AtlasConfig config) { @@ -244,8 +248,24 @@ private Timer publishTaskTimer(String id) { return debugRegistry.timer(PUBLISH_TASK_TIMER, "id", id); } - synchronized void sendToAtlas() { - publishTaskTimer("sendToAtlas").record(() -> { + private void timePublishTask(String id, Runnable task) { + timePublishTask(id, id, task); + } + + private void timePublishTask(String id, String lockName, Runnable task) { + publishTaskTimer(id).record(() -> { + Lock lock = publishTaskLocks.computeIfAbsent(lockName, n -> new ReentrantLock()); + lock.lock(); + try { + task.run(); + } finally { + lock.unlock(); + } + }); + } + + void sendToAtlas() { + timePublishTask("sendToAtlas", () -> { if (config.enabled()) { long t = lastCompletedTimestamp(stepMillis); if (t > lastFlushTimestamp) { @@ -272,8 +292,8 @@ synchronized void sendToAtlas() { }); } - synchronized void sendToLWC() { - publishTaskTimer("sendToLWC").record(() -> { + void sendToLWC() { + timePublishTask("sendToLWC", () -> { long t = lastCompletedTimestamp(lwcStepMillis); //if (config.enabled() || config.lwcEnabled()) { // If either are enabled we poll the meters for each step interval to flush the @@ -306,8 +326,8 @@ synchronized void sendToLWC() { } /** Collect measurements from all the meters in the registry. */ - synchronized void pollMeters(long t) { - publishTaskTimer("pollMeters").record(() -> { + void pollMeters(long t) { + timePublishTask("pollMeters", "atlasMeasurements", () -> { if (t > lastPollTimestamp) { MeasurementConsumer consumer = (id, timestamp, value) -> { // Update the map for data to go to the Atlas storage layer @@ -365,10 +385,10 @@ private void fetchSubscriptions() { * Get a list of all consolidated measurements intended to be sent to Atlas and break them * into batches. */ - synchronized List getBatches(long t) { + List getBatches(long t) { final int n = atlasMeasurements.size(); final List batches = new ArrayList<>(n / batchSize + 1); - publishTaskTimer("getBatches").record(() -> { + timePublishTask("getBatches", "atlasMeasurements", () -> { debugRegistry.distributionSummary("spectator.registrySize").record(n); List input = new ArrayList<>(n); Iterator> it = atlasMeasurements.entrySet().iterator();