Skip to content

Commit

Permalink
inspectIT#1269: Slimmed down OpenTelemetryControllerImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
Heiko Holz committed Jan 17, 2022
1 parent 6e555bd commit 0099124
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import lombok.AccessLevel;
import lombok.Getter;
Expand All @@ -45,30 +44,13 @@
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The implementation of {@link IOpenTelemetryController}. The {@link OpenTelemetryControllerImpl} configures {@link GlobalOpenTelemetry} (tracing) and {@link GlobalMeterProvider} (metrics).
* <p>
* The hierarchy of {@link OpenTelemetrySdk} is as follows. All fields are private final and thus can only be changed via reflection
* <p>
* The {@link OpenTelemetrySdk} contains the {@link SdkTracerProvider}, i.e., {@link OpenTelemetrySdk#getSdkTracerProvider()}.
* The {@link SdkTracerProvider} contains the {@link io.opentelemetry.sdk.trace.TracerSharedState}, i.e., {@link SdkTracerProvider#sharedState}.
* The {@link io.opentelemetry.sdk.trace.TracerSharedState} contains {@link io.opentelemetry.sdk.trace.TracerSharedState#activeSpanProcessor}, which can be a list of {@link SpanProcessor} (for example {@link SimpleSpanProcessor} or {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor}.
* <p>
* The {@link SimpleSpanProcessor} contains the {@link SimpleSpanProcessor#spanExporter}, i.e., {@link io.opentelemetry.sdk.trace.export.SpanExporte}.
* The {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor} contains the {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor#worker}, which then contains the {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor.Worker#spanExporter}
* <p>
* The hierarchy of {@link io.opentelemetry.sdk.metrics.SdkMeterProvider} is as follows. All fields are private final and thus can only be changed via reflection.
* <p>
* The hierarchy of {@link io.opentelemetry.sdk.metrics.SdkMeterProvider} is as ffollows. All fields are private final and thus can only be changed via reflection.
* <p>
* The {@link io.opentelemetry.sdk.metrics.SdkMeterProvider} contains {@link io.opentelemetry.sdk.metrics.SdkMeterProvider#sharedState} and {@link io.opentelemetry.sdk.metrics.SdkMeterProvider#collectionInfoMap}.
* The {@link io.opentelemetry.sdk.metrics.internal.state.AutoValue_MeterProviderSharedState} has nothing of interest.
* The {@link io.opentelemetry.sdk.metrics.internal.export.AutoValue_CollectionInfo} contains the {@link io.opentelemetry.sdk.metrics.internal.export.AutoValue_CollectionInfo#reader}, i.e., {@link io.opencensus.exporter.metrics.util.MetricReader}.
* The {@link io.opentelemetry.sdk.metrics.export.MetricReaderFactory} can be implemented as {@link io.opentelemetry.sdk.metrics.export.PeriodicMetricReader}.
* The {@link io.opentelemetry.sdk.metrics.export.PeriodicMetricReader} contains {@link io.opentelemetry.sdk.metrics.export.PeriodicMetricReader#exporter}, e.g., {@link io.opentelemetry.exporter.logging.LoggingMetricExporter}, and the {@link io.opentelemetry.sdk.metrics.export.PeriodicMetricReader#scheduledFuture}.
* The {@link java.util.concurrent.ScheduledFuture}
* The individual {@link rocks.inspectit.ocelot.core.service.DynamicallyActivatableService services}, i.e., {@link DynamicallyActivatableMetricsExporterService} and {@link DynamicallyActivatableTraceExporterService}, register to and unregister from {@link OpenTelemetryControllerImpl this}.
* <b>Important note:</b> {@link #shutdown() shutting down} the {@link OpenTelemetryControllerImpl} is final and cannot be revoked.
*/
@Slf4j
public class OpenTelemetryControllerImpl implements IOpenTelemetryController {
Expand Down Expand Up @@ -103,6 +85,11 @@ public class OpenTelemetryControllerImpl implements IOpenTelemetryController {
*/
private AtomicBoolean isConfiguring = new AtomicBoolean(false);

/**
* Whether the {@link OpenTelemetryImpl} is currently {@link #shutdown() shutting down}
*/
private AtomicBoolean isShuttingDown = new AtomicBoolean(false);

/**
* The registered {@link DynamicallyActivatableTraceExporterService}.
*/
Expand Down Expand Up @@ -160,10 +147,7 @@ void init() {
serviceNameResource = Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, env.getCurrentConfig()
.getServiceName()));

// create span processor, exporter, and sampler
spanExporter = new DynamicMultiSpanExporter();
spanProcessor = BatchSpanProcessor.builder(spanExporter).build();
sampler = new DynamicSampler(env.getCurrentConfig().getTracing().getSampleProbability());
initTracing(env.getCurrentConfig());

// close the tracer provider when the JVM is shutting down
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
Expand Down Expand Up @@ -201,7 +185,9 @@ public boolean isConfiguring() {
@VisibleForTesting
// make sure this is called after the individual services have (un)-registered
synchronized boolean configureOpenTelemetry() {
log.info("configureOpenTelemetry at timestamp {}", System.currentTimeMillis());
if (stopped) {
return false;
}
boolean success = false;
if (!isConfiguring.compareAndSet(false, true)) {
log.info("Multiple configure calls");
Expand All @@ -211,8 +197,6 @@ synchronized boolean configureOpenTelemetry() {

InspectitConfig configuration = env.getCurrentConfig();

// TODO: somehow compute whether anything has changed in tracing or metrics. If no changes happened, we do not need to reconfigure tracing and metrics!

// set serviceName
serviceNameResource = Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, configuration.getServiceName()));

Expand All @@ -230,7 +214,7 @@ synchronized boolean configureOpenTelemetry() {
success = successConfigureTracing && successConfigureMeterProvider;

if (success) {
log.info("Successfully configured OpenTelemetry with TracerProvider and MeterProvider");
log.info("Successfully configured OpenTelemetry with tracing and metrics");
} else {
log.error("Failed to configure OpenTelemetry. Please scan the logs for detailed failure messages.");
}
Expand All @@ -242,7 +226,6 @@ synchronized boolean configureOpenTelemetry() {
tracingSettingsChanged = false;
metricSettingsChanged = false;
return success;

}

@Override
Expand All @@ -264,9 +247,17 @@ public void flush() {
/**
* Shuts down the {@link OpenTelemetryControllerImpl} by calling {@link OpenTelemetryImpl#close()} and {@link MeterProviderImpl#close()} and waits for it to complete.
* The shutdown is final, i.e., once this {@link OpenTelemetryImpl} is shutdown, it cannot be re-enabled!
* <p>
* Only use this method for testing or when the JVM is shutting down.
*/
@Override
synchronized public void shutdown() {
if (isStopped()) {
return;
}
if (!isShuttingDown.compareAndSet(false, true)) {
log.info("Multiple shutdown calls");
}
long start = System.nanoTime();

// close tracing
Expand All @@ -280,13 +271,13 @@ synchronized public void shutdown() {
long startMeterProviderShutdown = System.nanoTime();
// note: close calls SdkMeterProvider#shutdown, which calls MetricReader#shutdown, which calls MetricExporter#shutdown
CompletableResultCode shutdownResult = meterProvider.close();
log.info("time to shut down {}: {} ms (success={})", meterProvider, (System.nanoTime() - startMeterProviderShutdown) * 1e-6, shutdownResult.isSuccess());
}
GlobalMeterProvider.set(null);
GlobalOpenTelemetry.resetForTest();
configured = false;
enabled = false;
stopped = true;
isShuttingDown.set(false);

// set all OTEL related fields to null
openTelemetry = null;
Expand All @@ -296,7 +287,7 @@ synchronized public void shutdown() {
spanExporter = null;
spanProcessor = null;

log.info("Shut down {}. The shutdown process took {} ms", getClass().getSimpleName(), (System.nanoTime() - start) * 1e-6);
log.info("Shut down {}. The shutdown process took {} ms", getClass().getSimpleName(), (System.nanoTime() - start) / 1000000);
}

@Override
Expand All @@ -309,6 +300,56 @@ synchronized public void notifyMetricsSettingsChanged() {
metricSettingsChanged = true;
}

/**
* Initializes tracing components, i.e., {@link #openTelemetry}, {@link #spanExporter}, {@link #spanProcessor}, {@link #sampler} and {@link SdkTracerProvider}
*
* @param configuration
*/
@VisibleForTesting
void initTracing(InspectitConfig configuration) {
double sampleProbability = configuration.getTracing().getSampleProbability();

// set up sampler
sampler = new DynamicSampler(sampleProbability);

// set up spanProcessor and spanExporter
spanExporter = new DynamicMultiSpanExporter();
spanProcessor = BatchSpanProcessor.builder(spanExporter)
.setMaxExportBatchSize(configuration.getTracing().getMaxExportBatchSize())
.setScheduleDelay(configuration.getTracing().getScheduleDelayMillis(), TimeUnit.MILLISECONDS)
.build();
// build TracerProvider
SdkTracerProviderBuilder builder = SdkTracerProvider.builder()
.setSampler(sampler)
.setResource(serviceNameResource)
.addSpanProcessor(spanProcessor);

// build the SdkTracerProvider
SdkTracerProvider tracerProvider = builder.build();

// build OTEL
OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), JaegerPropagator.getInstance(), W3CBaggagePropagator.getInstance(), B3Propagator.injectingMultiHeaders())))
.build();

// if the OpenTelemetryImpl has not been build, then build and register it
openTelemetry = OpenTelemetryImpl.builder().openTelemetry(openTelemetrySdk).build();

// check if any OpenTelemetry has been registered to GlobalOpenTelemetry.
// If so, reset it.
if (null != OpenTelemetryUtils.getGlobalOpenTelemetry()) {
log.info("reset {}", GlobalOpenTelemetry.get().getClass().getName());
GlobalOpenTelemetry.resetForTest();
}

// set GlobalOpenTelemetry
openTelemetry.registerGlobal();

// update the OTEL_TRACER field in OpenTelemetrySpanBuilderImpl in case that it was already set
OpenCensusShimUtils.updateOpenTelemetryTracerInOpenTelemetrySpanBuilderImpl();
}

/**
* Configures the tracing, i.e. {@link #openTelemetry} and the related {@link SdkTracerProvider}. A new {@link SdkTracerProvider} is only built once or after {@link #shutdown()} was called.
*
Expand All @@ -318,77 +359,12 @@ synchronized public void notifyMetricsSettingsChanged() {
*/
@VisibleForTesting
synchronized boolean configureTracing(InspectitConfig configuration) {
if (!enabled) {
if (!enabled || stopped) {
return true;
}
try {

boolean buildTracerProvider = null == openTelemetry;

// set up the sampler if necessary
double sampleProbability = configuration.getTracing().getSampleProbability();
if (null == sampler) {
sampler = new DynamicSampler(sampleProbability);
buildTracerProvider = true;
}
// update sample probability
sampler.setSampleProbability(sampleProbability);

// set up span processor and span exporter if necessary
if (null == spanProcessor || spanExporter == null) {
// create new span exporter if null
if (null == spanExporter) {
spanExporter = new DynamicMultiSpanExporter();
// register the span exporters for all registered services
for (DynamicallyActivatableTraceExporterService service : registeredTraceExportServices.values()) {
spanExporter.registerSpanExporter(service.getName(), service.getSpanExporter());
}
}
// build the span processor
spanProcessor = BatchSpanProcessor.builder(spanExporter).build();
buildTracerProvider = true;
}

// if the TracerProvider needs to be build, build it and register the OpenTelemetryImpl
if (buildTracerProvider) {
// build TracerProvider
SdkTracerProviderBuilder builder = SdkTracerProvider.builder()
.setSampler(sampler)
.setResource(serviceNameResource)
.addSpanProcessor(spanProcessor);

// build the SdkTracerProvider
SdkTracerProvider tracerProvider = builder.build();

// build OTEL
OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), JaegerPropagator.getInstance(), W3CBaggagePropagator.getInstance(), B3Propagator.injectingMultiHeaders())))
.build();

// if the OpenTelemetryImpl has not been build, then build and register it
if (null == openTelemetry) {
openTelemetry = OpenTelemetryImpl.builder().openTelemetry(openTelemetrySdk).build();

// check if any OpenTelemetry has been registered to GlobalOpenTelemetry.
// If so, reset it.
if (null != OpenTelemetryUtils.getGlobalOpenTelemetry()) {
log.info("reset {}", GlobalOpenTelemetry.get().getClass().getName());
GlobalOpenTelemetry.resetForTest();
}

// set GlobalOpenTelemetry
openTelemetry.registerGlobal();
}
// otherwise, just update the underlying OpenTelemetrySdk
else {
openTelemetry.set(openTelemetrySdk);
}

// update the OTEL_TRACER field in OpenTelemetrySpanBuilderImpl
OpenCensusShimUtils.updateOpenTelemetryTracerInOpenTelemetrySpanBuilderImpl();
}

sampler.setSampleProbability(configuration.getTracing().getSampleProbability());
return true;
} catch (Exception e) {
log.error("Failed to configure OpenTelemetry Tracing", e);
Expand All @@ -409,16 +385,18 @@ synchronized boolean configureMeterProvider(InspectitConfig configuration) {
return true;
}
try {

// build new SdkMeterProvider
SdkMeterProviderBuilder builder = SdkMeterProvider.builder().setResource(serviceNameResource);

// register metric reader for each service
for (DynamicallyActivatableMetricsExporterService metricsExportService : registeredMetricExporterServices.values()) {
log.info("add metricReader for {} ({})", metricsExportService, metricsExportService.getNewMetricReaderFactory());
builder.registerMetricReader(OpenCensusMetrics.attachTo(metricsExportService.getNewMetricReaderFactory()));
}

SdkMeterProvider sdkMeterProvider = builder.build();

// if the MeterProvider is null, build and register it
// if the MeterProviderImpl is null, build and register it
if (null == meterProvider) {
meterProvider = MeterProviderImpl.builder().meterProvider(sdkMeterProvider).build();
meterProvider.registerGlobal();
Expand Down
Loading

0 comments on commit 0099124

Please sign in to comment.