Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Remove all remaining synchronized implementations of Sampler.sample #809

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
public final class GuaranteedThroughputSampler implements Sampler {
public static final String TYPE = "lowerbound";

private ProbabilisticSampler probabilisticSampler;
private RateLimitingSampler lowerBoundSampler;
private volatile ProbabilisticSampler probabilisticSampler;
private volatile RateLimitingSampler lowerBoundSampler;
private Map<String, Object> tags;

public GuaranteedThroughputSampler(double samplingRate, double lowerBound) {
Expand Down Expand Up @@ -75,7 +75,7 @@ public synchronized boolean update(double samplingRate, double lowerBound) {
* @param id The traceId on the span
*/
@Override
public synchronized SamplingStatus sample(String operation, long id) {
public SamplingStatus sample(String operation, long id) {
SamplingStatus probabilisticSamplingStatus = probabilisticSampler.sample(operation, id);
SamplingStatus lowerBoundSamplingStatus = lowerBoundSampler.sample(operation, id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import io.jaegertracing.internal.samplers.http.OperationSamplingParameters;
import io.jaegertracing.internal.samplers.http.PerOperationSamplingParameters;
import io.jaegertracing.spi.Sampler;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
Expand All @@ -38,13 +41,13 @@
@Getter(AccessLevel.PACKAGE) //Visible for testing
public class PerOperationSampler implements Sampler {
private final int maxOperations;
private Map<String, GuaranteedThroughputSampler> operationNameToSampler;
private ProbabilisticSampler defaultSampler;
private double lowerBound;
private final ConcurrentHashMap<String, GuaranteedThroughputSampler> operationNameToSampler;
private volatile ProbabilisticSampler defaultSampler;
private volatile double lowerBound;

public PerOperationSampler(int maxOperations, OperationSamplingParameters strategies) {
this(maxOperations,
new HashMap<String, GuaranteedThroughputSampler>(),
new ConcurrentHashMap<String, GuaranteedThroughputSampler>(),
new ProbabilisticSampler(strategies.getDefaultSamplingProbability()),
strategies.getDefaultLowerBoundTracesPerSecond());
update(strategies);
Expand All @@ -56,55 +59,62 @@ public PerOperationSampler(int maxOperations, OperationSamplingParameters strate
* @return true if any samplers were updated
*/
public synchronized boolean update(final OperationSamplingParameters strategies) {
boolean isUpdated = false;
AtomicBoolean isUpdated = new AtomicBoolean(false);

lowerBound = strategies.getDefaultLowerBoundTracesPerSecond();
ProbabilisticSampler defaultSampler = new ProbabilisticSampler(strategies.getDefaultSamplingProbability());
if (lowerBound != strategies.getDefaultLowerBoundTracesPerSecond()) {
lowerBound = strategies.getDefaultLowerBoundTracesPerSecond();
isUpdated.set(true);
}
ProbabilisticSampler defaultSampler = new ProbabilisticSampler(
strategies.getDefaultSamplingProbability());

if (!defaultSampler.equals(this.defaultSampler)) {
this.defaultSampler = defaultSampler;
isUpdated = true;
isUpdated.set(true);
}

Map<String, GuaranteedThroughputSampler> newOpsSamplers = new HashMap<String, GuaranteedThroughputSampler>();
//add or update operation samples using given strategies
Set<String> configuredOperations = strategies.getPerOperationStrategies().stream()
.map(PerOperationSamplingParameters::getOperation).collect(Collectors.toSet());
for (Entry<String, GuaranteedThroughputSampler> entry : operationNameToSampler.entrySet()) {
if (!configuredOperations.contains(entry.getKey())
&& entry.getValue().update(defaultSampler.getSamplingRate(), lowerBound)) {
isUpdated.set(true);
}
}

// add or update operation samples using given strategies
for (PerOperationSamplingParameters strategy : strategies.getPerOperationStrategies()) {
String operation = strategy.getOperation();
double samplingRate = strategy.getProbabilisticSampling().getSamplingRate();
GuaranteedThroughputSampler sampler = operationNameToSampler.get(operation);
if (sampler != null) {
isUpdated = sampler.update(samplingRate, lowerBound) || isUpdated;
newOpsSamplers.put(operation, sampler);
} else {
if (newOpsSamplers.size() < maxOperations) {
sampler = new GuaranteedThroughputSampler(samplingRate, lowerBound);
newOpsSamplers.put(operation, sampler);
isUpdated = true;
} else {
log.info("Exceeded the maximum number of operations({}) for per operations sampling",
maxOperations);
}
GuaranteedThroughputSampler sampler = operationNameToSampler.computeIfAbsent(operation,
op -> {
if (operationNameToSampler.size() >= maxOperations) {
log.info("Exceeded the maximum number of operations({}) for per operations sampling",
maxOperations);
return null;
}
isUpdated.set(true);
return new GuaranteedThroughputSampler(samplingRate, lowerBound);
});
if (sampler != null && sampler.update(samplingRate, lowerBound)) {
isUpdated.set(true);
}
}

operationNameToSampler = newOpsSamplers;
return isUpdated;
return isUpdated.get();
}

@Override
public synchronized SamplingStatus sample(String operation, long id) {
GuaranteedThroughputSampler sampler = operationNameToSampler.get(operation);
if (sampler != null) {
return sampler.sample(operation, id);
}

if (operationNameToSampler.size() < maxOperations) {
sampler = new GuaranteedThroughputSampler(defaultSampler.getSamplingRate(), lowerBound);
operationNameToSampler.put(operation, sampler);
return sampler.sample(operation, id);
public SamplingStatus sample(String operation, long id) {
Sampler sampler = operationNameToSampler.computeIfAbsent(operation, op -> {
if (operationNameToSampler.size() >= maxOperations) {
return null;
}
return new GuaranteedThroughputSampler(defaultSampler.getSamplingRate(), lowerBound);
});
if (sampler == null) {
sampler = defaultSampler;
}

return defaultSampler.sample(operation, id);
return sampler.sample(operation, id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand All @@ -29,6 +30,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -48,7 +50,7 @@ public class PerOperationSamplerTest {
private static final String OPERATION = "some OPERATION";

@Mock private ProbabilisticSampler defaultProbabilisticSampler;
private HashMap<String, GuaranteedThroughputSampler> operationToSamplers = new HashMap<>();
private ConcurrentHashMap<String, GuaranteedThroughputSampler> operationToSamplers = new ConcurrentHashMap<>();
private PerOperationSampler undertest;

@Before
Expand Down Expand Up @@ -174,24 +176,46 @@ public void testUpdateAddOperation() {
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND),
undertest.getOperationNameToSampler().get(OPERATION));
}

@Test
public void testAbsentOperationIsRemoved() {
String absentOp = "ShouldBeRemoved";
operationToSamplers.put(absentOp, mock(GuaranteedThroughputSampler.class));
public void testPreviouslyKnownOperationRevertsToDefaultProbabilityAfterUpdate() {
String previouslyKnownOp = "previouslyKnownOp";
operationToSamplers.put(previouslyKnownOp, mock(GuaranteedThroughputSampler.class));

PerOperationSamplingParameters perOperationSamplingParameters1 =
new PerOperationSamplingParameters(OPERATION, new ProbabilisticSamplingStrategy(SAMPLING_RATE));
List<PerOperationSamplingParameters> parametersList = new ArrayList<>();
parametersList.add(perOperationSamplingParameters1);

undertest.update(new OperationSamplingParameters(DEFAULT_SAMPLING_PROBABILITY,
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND,
parametersList));
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND,
parametersList));
verify(operationToSamplers.get(previouslyKnownOp)).update(DEFAULT_SAMPLING_PROBABILITY,
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND);
}

@Test
public void testUpdateRetainsSamplerInstances() {
String previouslyKnownOp = "previouslyKnownOp";
operationToSamplers.put(previouslyKnownOp, mock(GuaranteedThroughputSampler.class));
operationToSamplers.put(OPERATION, mock(GuaranteedThroughputSampler.class));
undertest.sample(previouslyKnownOp, TRACE_ID);
undertest.sample(OPERATION, TRACE_ID);
verify(operationToSamplers.get(previouslyKnownOp), times(1)).sample(previouslyKnownOp, TRACE_ID);
verify(operationToSamplers.get(OPERATION), times(1)).sample(OPERATION, TRACE_ID);


PerOperationSamplingParameters perOperationSamplingParameters1 =
new PerOperationSamplingParameters(OPERATION, new ProbabilisticSamplingStrategy(SAMPLING_RATE));
List<PerOperationSamplingParameters> parametersList = new ArrayList<>();
parametersList.add(perOperationSamplingParameters1);

assertEquals(1, undertest.getOperationNameToSampler().size());
assertEquals(new GuaranteedThroughputSampler(SAMPLING_RATE,
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND),
undertest.getOperationNameToSampler().get(OPERATION));
assertFalse(undertest.getOperationNameToSampler().containsKey(absentOp));
undertest.update(new OperationSamplingParameters(DEFAULT_SAMPLING_PROBABILITY,
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND,
parametersList));
undertest.sample(previouslyKnownOp, TRACE_ID);
undertest.sample(OPERATION, TRACE_ID);
verify(operationToSamplers.get(previouslyKnownOp), times(2)).sample(previouslyKnownOp, TRACE_ID);
verify(operationToSamplers.get(OPERATION), times(2)).sample(OPERATION, TRACE_ID);
}
}