Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix custom metric on Function Consumption #2946

Merged
merged 20 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@

package com.microsoft.applicationinsights.agent.internal.init;

import static java.util.concurrent.TimeUnit.MINUTES;

import com.azure.monitor.opentelemetry.exporter.implementation.heartbeat.HeartbeatExporter;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings;
import com.microsoft.applicationinsights.agent.bootstrap.diagnostics.DiagnosticsHelper;
import com.microsoft.applicationinsights.agent.internal.profiler.ProfilingInitializer;
import io.opentelemetry.javaagent.bootstrap.ClassFileTransformerHolder;
import io.opentelemetry.javaagent.bootstrap.InstrumentationHolder;
import java.lang.instrument.ClassFileTransformer;
import java.lang.instrument.Instrumentation;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,9 +28,16 @@ public class AzureFunctionsInitializer implements Runnable {
LoggerFactory.getLogger(DiagnosticsHelper.DIAGNOSTICS_LOGGER_NAME);

private final RuntimeConfigurator runtimeConfigurator;
private final Consumer<List<TelemetryItem>> telemetryItemsConsumer;
private final ProfilingInitializer profilingInitializer;

public AzureFunctionsInitializer(RuntimeConfigurator runtimeConfigurator) {
public AzureFunctionsInitializer(
RuntimeConfigurator runtimeConfigurator,
Consumer<List<TelemetryItem>> telemetryItemsConsumer,
heyams marked this conversation as resolved.
Show resolved Hide resolved
ProfilingInitializer profilingInitializer) {
this.runtimeConfigurator = runtimeConfigurator;
this.telemetryItemsConsumer = telemetryItemsConsumer;
this.profilingInitializer = profilingInitializer;
}

@Override
Expand Down Expand Up @@ -78,6 +92,20 @@ private void initialize() {
runtimeConfig.selfDiagnosticsLevel =
getAndLogAtDebug("APPLICATIONINSIGHTS_SELF_DIAGNOSTICS_LEVEL");

// initialize Profiler
if (runtimeConfig.preview.profiler.enabled) {
profilingInitializer.initialize();
}

// enable Heartbeat
long intervalSeconds = Math.min(runtimeConfig.heartbeat.intervalSeconds, MINUTES.toSeconds(15));
HeartbeatExporter.start(
intervalSeconds,
runtimeConfigurator.getTelemetryClient()::populateDefaults,
telemetryItemsConsumer);

// TODO (heya) enable Statsbeat and need to refactor RuntimeConfiguration

runtimeConfigurator.apply(runtimeConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ public class RuntimeConfiguration {
public String instrumentationLoggingLevel;

public String selfDiagnosticsLevel;

public Configuration.PreviewConfiguration preview = new Configuration.PreviewConfiguration();

public Configuration.ProfilerConfiguration profiler = new Configuration.ProfilerConfiguration();
heyams marked this conversation as resolved.
Show resolved Hide resolved

public Configuration.Heartbeat heartbeat = new Configuration.Heartbeat();
heyams marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ private static RuntimeConfiguration copy(RuntimeConfiguration config) {

copy.instrumentationLoggingLevel = config.instrumentationLoggingLevel;
copy.selfDiagnosticsLevel = config.selfDiagnosticsLevel;

copy.preview.profiler.enabled = config.preview.profiler.enabled;

copy.heartbeat.intervalSeconds = config.heartbeat.intervalSeconds;
return copy;
}

Expand Down Expand Up @@ -150,6 +154,10 @@ static void updateSampling(
}
}

TelemetryClient getTelemetryClient() {
return telemetryClient;
}

private void updateConnectionString(@Nullable String connectionString) {
telemetryClient.updateConnectionStrings(connectionString, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
.setDiskPersistenceMaxSizeMb(configuration.preview.diskPersistenceMaxSizeMb)
.build();

// interval longer than 15 minutes is not allowed since we use this data for usage telemetry
long intervalSeconds = Math.min(configuration.heartbeat.intervalSeconds, MINUTES.toSeconds(15));
Consumer<List<TelemetryItem>> telemetryItemsConsumer =
Consumer<List<TelemetryItem>> heartbeatTelemetryItemConsumer =
telemetryItems -> {
for (TelemetryItem telemetryItem : telemetryItems) {
TelemetryObservers.INSTANCE
Expand All @@ -146,8 +144,14 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
telemetryClient.getMetricsBatchItemProcessor().trackAsync(telemetryItem);
}
};
HeartbeatExporter.start(
intervalSeconds, telemetryClient::populateDefaults, telemetryItemsConsumer);

// interval longer than 15 minutes is not allowed since we use this data for usage telemetry
if (telemetryClient.getConnectionString() != null) {
long intervalSeconds =
Math.min(configuration.heartbeat.intervalSeconds, MINUTES.toSeconds(15));
HeartbeatExporter.start(
intervalSeconds, telemetryClient::populateDefaults, heartbeatTelemetryItemConsumer);
}

TelemetryClient.setActive(telemetryClient);

Expand All @@ -164,9 +168,11 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
BytecodeUtilImpl.connectionStringConfiguredAtRuntime =
configuration.connectionStringConfiguredAtRuntime;

ProfilingInitializer profilingInitializer = null;
if (configuration.preview.profiler.enabled) {
try {
ProfilingInitializer.initialize(tempDir, configuration, telemetryClient);
profilingInitializer =
ProfilingInitializer.initialize(tempDir, configuration, telemetryClient);
} catch (RuntimeException e) {
startupLogger.warning("Failed to initialize profiler", e);
}
Expand All @@ -175,7 +181,8 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
if (ConfigurationBuilder.inAzureFunctionsConsumptionWorker()) {
AzureFunctions.setup(
() -> telemetryClient.getConnectionString() != null,
new AzureFunctionsInitializer(runtimeConfigurator));
new AzureFunctionsInitializer(
runtimeConfigurator, heartbeatTelemetryItemConsumer, profilingInitializer));
}

RpConfiguration rpConfiguration = FirstEntryPoint.getRpConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public synchronized void initialize() {
"disable profiler or use a writable file system");
}

if (configuration.preview.profiler.enabled) {
if (configuration.preview.profiler.enabled && telemetryClient.getConnectionString() != null) {
performInit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import static com.azure.monitor.opentelemetry.exporter.implementation.utils.AzureMonitorMsgId.BATCH_ITEM_PROCESSOR_ERROR;

import com.azure.core.util.logging.ClientLogger;
import com.azure.monitor.opentelemetry.exporter.implementation.logging.OperationLogger;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.pipeline.TelemetryItemExporter;
Expand All @@ -26,6 +27,8 @@
// copied from io.opentelemetry.sdk.trace.export.BatchSpanProcessor
public final class BatchItemProcessor {

private static final ClientLogger logger = new ClientLogger(BatchItemProcessor.class);

private static final String WORKER_THREAD_NAME =
BatchItemProcessor.class.getSimpleName() + "_WorkerThread";

Expand Down Expand Up @@ -62,8 +65,14 @@ public static BatchItemProcessorBuilder builder(TelemetryItemExporter exporter)
queue,
queue.capacity(),
queueName);
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();

try {
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.setUncaughtExceptionHandler((t, e) -> logger.error(e.getMessage(), e));
workerThread.start();
} catch (RuntimeException ex) {
logger.error("An error occurs when running the batch worker thread", ex);
}
trask marked this conversation as resolved.
Show resolved Hide resolved
}

public void trackAsync(TelemetryItem item) {
Expand Down