Skip to content

Commit

Permalink
inspectIT#1269: Changed OpenTelemetryControllerImpl to not rebuild th…
Browse files Browse the repository at this point in the history
…e SdkTracerProvider when individual trace exporter services change. For this, the DynamicMultiSpanExporter and DynamicSampler have been implemented that can be updated while OpenTelemetry is running. The DynamicallyActivatableTraceExporterService now expose the spanExporter instead of the spanProcessor.
  • Loading branch information
Heiko Holz committed Jan 13, 2022
1 parent 1ed44c5 commit 4b20a8b
Show file tree
Hide file tree
Showing 16 changed files with 570 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import org.awaitility.core.ConditionTimeoutException;
import rocks.inspectit.ocelot.bootstrap.AgentManager;
import rocks.inspectit.ocelot.bootstrap.Instances;
import rocks.inspectit.ocelot.bootstrap.opentelemetry.NoopOpenTelemetryController;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -313,6 +315,13 @@ public static TimeSeries getTimeseries(String metricName, Map<String, String> ta
*/
public static InMemorySpanExporter initializeOpenTelemetryForSystemTesting() {

if (NoopOpenTelemetryController.INSTANCE != Instances.openTelemetryController) {
System.out.println("shut down " + Instances.openTelemetryController.getClass().getSimpleName());

// shut down any previously configured OTELs
Instances.openTelemetryController.shutdown();
Instances.openTelemetryController = NoopOpenTelemetryController.INSTANCE;
}
// create an SdkTracerProvider with InMemorySpanExporter and LoggingSpanExporter
InMemorySpanExporter inMemSpanExporter = InMemorySpanExporter.create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
public interface IOpenTelemetryController {

/**
* Shuts down the {@link IOpenTelemetryController}
*/
void shutdown();

/**
* Starts the {@link IOpenTelemetryController}
* Returns a completely noop {@link IOpenTelemetryController}
*
* @return Whether the {@link IOpenTelemetryController} was successfuly started
* @return
*/
boolean start();
static IOpenTelemetryController noop() {
return NoopOpenTelemetryController.INSTANCE;
}

/**
* Gets whether the {@link IOpenTelemetryController} is configured
Expand All @@ -32,12 +29,37 @@ public interface IOpenTelemetryController {
boolean isEnabled();

/**
* Notifies the {@link IOpenTelemetryController} that something in the {@link rocks.inspectit.ocelot.config.model.tracing.TracingSettings} changed
* Gets whether the {@link IOpenTelemetryController} has been stopped. Once stopped, it cannot be restarted
*
* @return Whether the {@link IOpenTelemetryController} has been stopped.
*/
boolean isStopped();

/**
* Starts the {@link IOpenTelemetryController}
*
* @return Whether the {@link IOpenTelemetryController} was successfully started
*/
boolean start();

/**
* Flushes all pending spans and metrics and waits for it to complete.
*/
void flush();

/**
* Shuts down the {@link IOpenTelemetryController}.
* The shutdown is final, i.e., once this {@link IOpenTelemetryController} is shutdown, it cannot be re-enabled!
*/
void shutdown();

/**
* Notifies the {@link IOpenTelemetryController} that something in the {@link rocks.inspectit.ocelot.config.model.tracing.TracingSettings} or in the {@link rocks.inspectit.ocelot.config.model.exporters.trace.TraceExportersSettings} changed
*/
void notifyTracingSettingsChanged();

/**
* Notifies the {@link IOpenTelemetryController} that something in the {@link rocks.inspectit.ocelot.config.model.metrics.MetricsSettings} changed
* Notifies the {@link IOpenTelemetryController} that something in the {@link rocks.inspectit.ocelot.config.model.metrics.MetricsSettings} or in the {@link rocks.inspectit.ocelot.config.model.exporters.metrics.MetricsExportersSettings} changed
*/
void notifyMetricsSettingsChanged();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ public class NoopOpenTelemetryController implements IOpenTelemetryController {
public static final NoopOpenTelemetryController INSTANCE = new NoopOpenTelemetryController();

@Override
public void shutdown() {
public boolean isConfigured() {
return false;
}

@Override
public boolean isEnabled() {
return true;
}

@Override
Expand All @@ -15,13 +20,18 @@ public boolean start() {
}

@Override
public boolean isConfigured() {
public boolean isStopped() {
return false;
}

@Override
public boolean isEnabled() {
return true;
public void shutdown() {

}

@Override
public void flush() {

}

@Override
Expand All @@ -31,6 +41,6 @@ public void notifyTracingSettingsChanged() {

@Override
public void notifyMetricsSettingsChanged() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package rocks.inspectit.ocelot.core.exporter;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

/**
* {@link SpanExporter} that forwards all received spans to a list of {@link SpanExporter}, similar to {@link io.opentelemetry.sdk.trace.export.MultiSpanExporter}. In contrast to {@link io.opentelemetry.sdk.trace.export.MultiSpanExporter}, {@link SpanExporter}s can by dynamically registered and unregisted.
*
* <p>Can be used to export to multiple backends using the same {@link io.opentelemetry.sdk.trace.SpanProcessor} like {@link io.opentelemetry.sdk.trace.export.SimpleSpanProcessor} or {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor}
*/
@Slf4j
public class DynamicMultiSpanExporter implements SpanExporter {

/**
* The {@link SpanExporter}s that all received spans are forwarded to.
*/
private Map<String, SpanExporter> spanExporters = new ConcurrentHashMap<>();

private Object lock = new Object();

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
CompletableResultCode resultCode;
synchronized (lock) {
resultCode = execute(spanExporters.values(), (spanExporter) -> spanExporter.export(spans));
}
return resultCode;
}

@Override
public CompletableResultCode flush() {
CompletableResultCode resultCode;
synchronized (lock) {
resultCode = execute(spanExporters.values(), (spanExporter) -> spanExporter.flush());
}
return resultCode;
}

@Override
public CompletableResultCode shutdown() {
CompletableResultCode resultCode;
synchronized (lock) {
resultCode = execute(spanExporters.values(), (spanExporter) -> spanExporter.shutdown());
}
return resultCode;
}

/**
* Executes the given function on a {@link Collection<SpanExporter>}
*
* @param spanExporters The {@link SpanExporter}s
* @param spanExporterFun The {@link Function<SpanExporter, CompletableResultCode>} to apply on each {@link SpanExporter}
*
* @return The {@link CompletableResultCode} of all applications of the function
*/
private static CompletableResultCode execute(Collection<SpanExporter> spanExporters, Function<SpanExporter, CompletableResultCode> spanExporterFun) {
List<CompletableResultCode> resultCodes = new ArrayList<>(spanExporters.size());
for (SpanExporter spanExporter : spanExporters) {
CompletableResultCode resultCode;
try {
resultCode = spanExporterFun.apply(spanExporter);
} catch (RuntimeException e) {
log.error("Exception thrown in execute", e);
resultCodes.add(CompletableResultCode.ofFailure());
continue;
}
resultCodes.add(resultCode);
}
return CompletableResultCode.ofAll(resultCodes);
}

/**
* Registers the given {@link SpanExporter} to export {@link SpanData} for sampled spans.
*
* @param registerName The name of the span exporter service. Must be unique for each service.
* @param spanExporter The {@link SpanExporter} that is called for each {@link #export(Collection)}, {@link #flush()}, and {@link #shutdown()}
*
* @return Whether a {@link SpanProcessor} was **not** previously registered with the same name. Returns false if a {@link SpanExporter} with the given name was already registered.
*/
public boolean registerSpanExporter(String registerName, SpanExporter spanExporter) {
return null == spanExporters.put(registerName, spanExporter);
}

/**
* Unregisters the given {@link SpanExporter}
*
* @param registerName The name of the span exporter service.
*
* @return Whether a {@link SpanExporter} with the given name was successfully removed. Returns false if no {@link SpanExporter} with the given name was previously registered.
*/
public boolean unregisterSpanExporter(String registerName) {
return null != spanExporters.remove(registerName);
}

@Override
public String toString() {
return "SpanExporterImpl{" + "spanExporters=" + spanExporters + '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rocks.inspectit.ocelot.core.exporter;

import io.opentelemetry.sdk.trace.SpanProcessor;
import lombok.Getter;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import rocks.inspectit.ocelot.config.model.InspectitConfig;
import rocks.inspectit.ocelot.config.model.exporters.metrics.PrometheusExporterSettings;
import rocks.inspectit.ocelot.core.service.DynamicallyActivatableService;
Expand All @@ -13,11 +12,11 @@
public abstract class DynamicallyActivatableTraceExporterService extends DynamicallyActivatableService {

/**
* Gets the {@link SpanProcessor} used to process {@link io.opentelemetry.api.trace.Span}
* Gets the {@link SpanExporter} of this service to export {@link io.opentelemetry.sdk.trace.data.SpanData} to
*
* @return The {@link SpanProcessor} used to process {@link io.opentelemetry.api.trace.Span}
* @return The {@link SpanExporter} of this service to export {@link io.opentelemetry.sdk.trace.data.SpanData} to
*/
public abstract SpanProcessor getSpanProcessor();
public abstract SpanExporter getSpanExporter();

/**
* Constructor.
Expand All @@ -31,5 +30,4 @@ public DynamicallyActivatableTraceExporterService(String... configDependencies)
super(configDependencies);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter;
import io.opentelemetry.exporter.jaeger.thrift.JaegerThriftSpanExporter;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -23,10 +21,8 @@ public class JaegerExporterService extends DynamicallyActivatableTraceExporterSe

private JaegerGrpcSpanExporter grpcSpanExporter;

private JaegerThriftSpanExporter spanExporter;

@Getter
private SpanProcessor spanProcessor;
private JaegerThriftSpanExporter spanExporter;

public JaegerExporterService() {
super("exporters.tracing.jaeger", "tracing.enabled");
Expand Down Expand Up @@ -55,7 +51,6 @@ protected boolean checkEnabledForConfig(InspectitConfig conf) {
protected boolean doEnable(InspectitConfig configuration) {
try {
JaegerExporterSettings settings = configuration.getExporters().getTracing().getJaeger();

log.info("Starting Jaeger Exporter with url '{}' (grpc '{}')", settings.getUrl(), settings.getGrpc());

// TODO: use getUrl() or getGRPC()?
Expand All @@ -66,9 +61,6 @@ protected boolean doEnable(InspectitConfig configuration) {
// create span exporter
spanExporter = JaegerThriftSpanExporter.builder().setEndpoint(settings.getUrl()).build();

// create span processor
spanProcessor = BatchSpanProcessor.builder(spanExporter).build();

// register
openTelemetryController.registerTraceExporterService(this);

Expand All @@ -83,11 +75,10 @@ protected boolean doEnable(InspectitConfig configuration) {
protected boolean doDisable() {
log.info("Stopping Jaeger Exporter");
try {
if (null != spanProcessor) {
spanProcessor.shutdown();
spanProcessor = null;
}
openTelemetryController.unregisterTraceExporterService(this);
if (null != spanExporter) {
spanExporter.close();
}
} catch (Throwable t) {
log.error("Error disabling Jaeger exporter", t);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package rocks.inspectit.ocelot.core.exporter;

import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rocks.inspectit.ocelot.config.model.InspectitConfig;
import rocks.inspectit.ocelot.config.model.exporters.trace.LoggingTraceExporterSettings;
import rocks.inspectit.ocelot.core.opentelemetry.OpenTelemetryControllerImpl;

import javax.validation.Valid;

Expand All @@ -23,11 +19,8 @@ public class LoggingTraceExporterService extends DynamicallyActivatableTraceExpo
/**
* The {@link LoggingSpanExporter} for exporting the spans to the log
*/
private LoggingSpanExporter spanExporter;

@Getter
private SpanProcessor spanProcessor;

private LoggingSpanExporter spanExporter;

public LoggingTraceExporterService() {
super("exporters.tracing.logging", "tracing.enabled");
Expand All @@ -53,10 +46,6 @@ protected boolean doEnable(InspectitConfig conf) {
LoggingTraceExporterSettings logging = conf.getExporters().getTracing().getLogging();
try {

// create span processors
// SpanProcessors are also shut down when the corresponding TracerProvider is shut down. Thus, we need to create the SpanProcessors each time
spanProcessor = SimpleSpanProcessor.create(spanExporter);

boolean success = openTelemetryController.registerTraceExporterService(this);
if (success) {
log.info("Starting {}", getClass().getSimpleName());
Expand All @@ -74,12 +63,10 @@ protected boolean doEnable(InspectitConfig conf) {
@Override
protected boolean doDisable() {
try {
// shut down the span processor
if (null != spanProcessor) {
spanProcessor.shutdown();
spanProcessor = null;
}
openTelemetryController.unregisterTraceExporterService(this);
if (null != spanExporter) {
spanExporter.flush();
}
log.info("Stopping TraceLoggingSpanExporter");
return true;
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 4b20a8b

Please sign in to comment.