From 47f05dee61af2b912489932096dcaea12c054f15 Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Wed, 26 Jul 2023 14:17:03 -0400 Subject: [PATCH] [jmx-metrics] Collect in callback (#949) Co-authored-by: Ryan Fitzpatrick <10867373+rmfitzpatrick@users.noreply.github.com> Co-authored-by: jason plumb <75337021+breedx-splk@users.noreply.github.com> --- jmx-metrics/README.md | 5 + .../integrationTest/resources/script.groovy | 16 +- .../jmxmetrics/GroovyMetricEnvironment.java | 291 +++++++++++++----- .../jmxmetrics/InstrumentHelper.groovy | 227 +++++++++----- .../contrib/jmxmetrics/MBeanHelper.groovy | 27 +- .../contrib/jmxmetrics/OtelHelper.groovy | 82 ++--- .../jmxmetrics/InstrumenterHelperTest.java | 17 +- 7 files changed, 433 insertions(+), 232 deletions(-) diff --git a/jmx-metrics/README.md b/jmx-metrics/README.md index 30e7a7f91..29e2db023 100644 --- a/jmx-metrics/README.md +++ b/jmx-metrics/README.md @@ -128,6 +128,11 @@ mutually exclusive with `otel.jmx.groovy.script`. The currently supported target [`CompositeData`](https://docs.oracle.com/javase/7/docs/api/javax/management/openmbean/CompositeData.html) instances, each key of their `CompositeType` `keySet` will be `.`-appended to the specified `instrumentName`, whose resulting instrument will be updated for each respective value. + - If the underlying MBean(s) held by the provided MBeanHelper are a mixed set of + [`CompositeData`](https://docs.oracle.com/javase/7/docs/api/javax/management/openmbean/CompositeData.html) instances + and simple values, the InstrumentHelper will not attempt to collect the metric. This is to prevent generating + metrics identified with the `instrumentName` and also the `instrumentName` with the `keySet` `.`-appended, + which breaks OpenTelemetry metric conventions. `otel.instrument()` provides additional signatures to obtain and update the returned `InstrumentHelper`: diff --git a/jmx-metrics/src/integrationTest/resources/script.groovy b/jmx-metrics/src/integrationTest/resources/script.groovy index 0592ad3e5..0ed6c493f 100644 --- a/jmx-metrics/src/integrationTest/resources/script.groovy +++ b/jmx-metrics/src/integrationTest/resources/script.groovy @@ -17,11 +17,13 @@ import io.opentelemetry.api.common.Attributes def loadMatches = otel.queryJmx("org.apache.cassandra.metrics:type=Storage,name=Load") -def load = loadMatches.first() +if (!loadMatches.isEmpty()) { + def load = loadMatches.first() -def lvr = otel.longHistogram( - "cassandra.storage.load", - "Size, in bytes, of the on disk data size this node manages", - "By" - ) -lvr.record(load.Count, Attributes.builder().put("myKey", "myVal").build()) + def lvr = otel.longHistogram( + "cassandra.storage.load", + "Size, in bytes, of the on disk data size this node manages", + "By" + ) + lvr.record(load.Count, Attributes.builder().put("myKey", "myVal").build()) +} diff --git a/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/GroovyMetricEnvironment.java b/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/GroovyMetricEnvironment.java index 523599fb9..a34a14ba9 100644 --- a/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/GroovyMetricEnvironment.java +++ b/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/GroovyMetricEnvironment.java @@ -5,8 +5,11 @@ package io.opentelemetry.contrib.jmxmetrics; +import groovy.lang.Closure; +import groovy.lang.Tuple2; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.BatchCallback; import io.opentelemetry.api.metrics.DoubleCounter; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleUpDownCounter; @@ -16,20 +19,25 @@ import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import io.opentelemetry.api.metrics.ObservableMeasurement; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.InstrumentValueType; import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.annotation.Nullable; public class GroovyMetricEnvironment { - private final SdkMeterProvider meterProvider; private final Meter meter; @@ -42,6 +50,12 @@ public class GroovyMetricEnvironment { longUpdaterRegistry = new ConcurrentHashMap<>(); private final Map>> doubleUpdaterRegistry = new ConcurrentHashMap<>(); + private final Map>> batchUpdaterRegistry = + new ConcurrentHashMap<>(); + private final Map>> + batchCallbackRegistry = new ConcurrentHashMap<>(); + private final Map instrumentOnceRegistry = + new ConcurrentHashMap<>(); /** * A central context for creating and exporting metrics, to be used by groovy scripts via {@link @@ -218,19 +232,27 @@ public LongHistogram getLongHistogram( * @param description metric description * @param unit - metric unit * @param updater - the value updater + * @return the ObservableDoubleMeasurement for the gauge */ - public void registerDoubleValueCallback( + public ObservableDoubleMeasurement registerDoubleValueCallback( final String name, final String description, final String unit, final Consumer updater) { - meter - .gaugeBuilder(name) - .setDescription(description) - .setUnit(unit) - .buildWithCallback( - proxiedDoubleObserver( - name, description, unit, InstrumentType.OBSERVABLE_GAUGE, updater)); + int descriptorHash = + InstrumentDescriptor.create( + name, + description, + unit, + InstrumentType.OBSERVABLE_GAUGE, + InstrumentValueType.DOUBLE) + .hashCode(); + + return registerCallback( + doubleUpdaterRegistry, + () -> meter.gaugeBuilder(name).setDescription(description).setUnit(unit).buildObserver(), + descriptorHash, + updater); } /** @@ -240,19 +262,29 @@ public void registerDoubleValueCallback( * @param description metric description * @param unit - metric unit * @param updater - the value updater + * @return the ObservableLongMeasurement for the gauge */ - public void registerLongValueCallback( + public ObservableLongMeasurement registerLongValueCallback( final String name, final String description, final String unit, final Consumer updater) { - meter - .gaugeBuilder(name) - .ofLongs() - .setDescription(description) - .setUnit(unit) - .buildWithCallback( - proxiedLongObserver(name, description, unit, InstrumentType.OBSERVABLE_GAUGE, updater)); + int descriptorHash = + InstrumentDescriptor.create( + name, description, unit, InstrumentType.OBSERVABLE_GAUGE, InstrumentValueType.LONG) + .hashCode(); + + return registerCallback( + longUpdaterRegistry, + () -> + meter + .gaugeBuilder(name) + .ofLongs() + .setDescription(description) + .setUnit(unit) + .buildObserver(), + descriptorHash, + updater); } /** @@ -262,20 +294,33 @@ public void registerLongValueCallback( * @param description metric description * @param unit - metric unit * @param updater - the value updater + * @return the ObservableDoubleMeasurement for the counter */ - public void registerDoubleCounterCallback( + public ObservableDoubleMeasurement registerDoubleCounterCallback( final String name, final String description, final String unit, final Consumer updater) { - meter - .counterBuilder(name) - .ofDoubles() - .setDescription(description) - .setUnit(unit) - .buildWithCallback( - proxiedDoubleObserver( - name, description, unit, InstrumentType.OBSERVABLE_COUNTER, updater)); + int descriptorHash = + InstrumentDescriptor.create( + name, + description, + unit, + InstrumentType.OBSERVABLE_COUNTER, + InstrumentValueType.DOUBLE) + .hashCode(); + + return registerCallback( + doubleUpdaterRegistry, + () -> + meter + .counterBuilder(name) + .setDescription(description) + .setUnit(unit) + .ofDoubles() + .buildObserver(), + descriptorHash, + updater); } /** @@ -285,19 +330,27 @@ public void registerDoubleCounterCallback( * @param description metric description * @param unit - metric unit * @param updater - the value updater + * @return the ObservableLongMeasurement for the counter */ - public void registerLongCounterCallback( + public ObservableLongMeasurement registerLongCounterCallback( final String name, final String description, final String unit, final Consumer updater) { - meter - .counterBuilder(name) - .setDescription(description) - .setUnit(unit) - .buildWithCallback( - proxiedLongObserver( - name, description, unit, InstrumentType.OBSERVABLE_COUNTER, updater)); + int descriptorHash = + InstrumentDescriptor.create( + name, + description, + unit, + InstrumentType.OBSERVABLE_COUNTER, + InstrumentValueType.LONG) + .hashCode(); + + return registerCallback( + longUpdaterRegistry, + () -> meter.counterBuilder(name).setDescription(description).setUnit(unit).buildObserver(), + descriptorHash, + updater); } /** @@ -307,20 +360,33 @@ public void registerLongCounterCallback( * @param description metric description * @param unit - metric unit * @param updater - the value updater + * @return the ObservableDoubleMeasurement for the counter */ - public void registerDoubleUpDownCounterCallback( + public ObservableDoubleMeasurement registerDoubleUpDownCounterCallback( final String name, final String description, final String unit, final Consumer updater) { - meter - .upDownCounterBuilder(name) - .ofDoubles() - .setDescription(description) - .setUnit(unit) - .buildWithCallback( - proxiedDoubleObserver( - name, description, unit, InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, updater)); + int descriptorHash = + InstrumentDescriptor.create( + name, + description, + unit, + InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, + InstrumentValueType.DOUBLE) + .hashCode(); + + return registerCallback( + doubleUpdaterRegistry, + () -> + meter + .upDownCounterBuilder(name) + .setDescription(description) + .setUnit(unit) + .ofDoubles() + .buildObserver(), + descriptorHash, + updater); } /** @@ -330,56 +396,111 @@ public void registerDoubleUpDownCounterCallback( * @param description metric description * @param unit - metric unit * @param updater - the value updater + * @return the ObservableLongMeasurement for the counter */ - public void registerLongUpDownCounterCallback( + public ObservableLongMeasurement registerLongUpDownCounterCallback( final String name, final String description, final String unit, final Consumer updater) { - meter - .upDownCounterBuilder(name) - .setDescription(description) - .setUnit(unit) - .buildWithCallback( - proxiedLongObserver( - name, description, unit, InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, updater)); + int descriptorHash = + InstrumentDescriptor.create( + name, + description, + unit, + InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, + InstrumentValueType.LONG) + .hashCode(); + + return registerCallback( + longUpdaterRegistry, + () -> + meter + .upDownCounterBuilder(name) + .setDescription(description) + .setUnit(unit) + .buildObserver(), + descriptorHash, + updater); } - private Consumer proxiedDoubleObserver( - final String name, - final String description, - final String unit, - final InstrumentType instrumentType, - final Consumer updater) { - InstrumentDescriptor descriptor = - InstrumentDescriptor.create( - name, description, unit, instrumentType, InstrumentValueType.DOUBLE); - doubleUpdaterRegistry.putIfAbsent(descriptor.hashCode(), new AtomicReference<>()); - AtomicReference> existingUpdater = - doubleUpdaterRegistry.get(descriptor.hashCode()); - existingUpdater.set(updater); - return doubleResult -> { - Consumer existing = existingUpdater.get(); - existing.accept(doubleResult); - }; + private T registerCallback( + final Map>> registry, + final Supplier observerBuilder, + final int descriptorHash, + final Consumer updater) { + + // Only build the instrument if it isn't already in the registry + ObservableMeasurement obs = instrumentOnceRegistry.get(descriptorHash); + if (obs == null) { + T observer = observerBuilder.get(); + instrumentOnceRegistry.put(descriptorHash, observer); + // If an updater was not provided, the measurement is expected to be added + // to a group batchcallback using the registerBatchCallback function + if (updater != null) { + Consumer cb = proxiedObserver(descriptorHash, registry, updater); + meter.batchCallback(() -> cb.accept(observer), observer); + } + return observer; + } else if (updater != null) { + // If the instrument has already been built with the appropriate proxied observer, + // update the registry so that the callback has the appropriate updater function + registry.get(descriptorHash).set(updater); + } + + return (T) obs; } - private Consumer proxiedLongObserver( - final String name, - final String description, - final String unit, - final InstrumentType instrumentType, - final Consumer updater) { - InstrumentDescriptor descriptor = - InstrumentDescriptor.create( - name, description, unit, instrumentType, InstrumentValueType.LONG); - longUpdaterRegistry.putIfAbsent(descriptor.hashCode(), new AtomicReference<>()); - AtomicReference> existingUpdater = - longUpdaterRegistry.get(descriptor.hashCode()); - existingUpdater.set(updater); - return longResult -> { - Consumer existing = existingUpdater.get(); - existing.accept(longResult); - }; + /** + * Register a collection of observables in a single batch callback + * + * @param identifier - object used to identify the callback to have only one callback + * @param callback - closure that records measurements for the observables + * @param measurement - first observable, the SDK expects this is always collected + * @param additional - remaining observable, the SDK expects this is sometimes collected + */ + public void registerBatchCallback( + Object identifier, + Closure callback, + ObservableMeasurement measurement, + ObservableMeasurement... additional) { + int hash = identifier.hashCode(); + // Store the callback in the registry so the proxied callback always runs the latest + // metric collection closure + batchUpdaterRegistry.putIfAbsent(hash, new AtomicReference<>()); + batchUpdaterRegistry.get(hash).set(callback); + + // collect the set of instruments into a set so we can compare to what's previously been + // registered + Set instrumentSet = + Arrays.stream(additional).collect(Collectors.toCollection(HashSet::new)); + instrumentSet.add(measurement); + + Tuple2> existingCallback = + batchCallbackRegistry.get(hash); + // If this is our first attempt to register this callback or the list of relevant instruments + // has changed, we need register the callback. + if (existingCallback == null || !existingCallback.getV2().equals(instrumentSet)) { + // If the callback has already been created, and we're here to update the set of instruments + // make sure we close the previous callback + if (existingCallback != null) { + existingCallback.getV1().close(); + } + batchCallbackRegistry.put( + hash, + new Tuple2<>( + meter.batchCallback( + () -> batchUpdaterRegistry.get(hash).get().call(), measurement, additional), + instrumentSet)); + } + } + + private Consumer proxiedObserver( + final int descriptorHash, + final Map>> registry, + final Consumer updater) { + registry.putIfAbsent(descriptorHash, new AtomicReference<>()); + registry.get(descriptorHash).set(updater); + return result -> registry.get(descriptorHash).get().accept(result); } } diff --git a/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/InstrumentHelper.groovy b/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/InstrumentHelper.groovy index faf0e2bd9..369a4a6b1 100644 --- a/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/InstrumentHelper.groovy +++ b/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/InstrumentHelper.groovy @@ -7,6 +7,10 @@ package io.opentelemetry.contrib.jmxmetrics import groovy.jmx.GroovyMBean import groovy.transform.PackageScope +import io.opentelemetry.api.metrics.ObservableMeasurement + +import javax.management.AttributeNotFoundException +import javax.management.InvalidAttributeValueException import java.util.logging.Logger import javax.management.openmbean.CompositeData @@ -17,14 +21,13 @@ import javax.management.openmbean.CompositeData * * Intended to be used via the script-bound `otel` {@link OtelHelper} instance methods: * - * def threadCount = otel.instrument(myThreadingMBeanHelper, + * otel.instrument(myThreadingMBeanHelper, * "jvm.threads.count", "number of threads", * "1", [ * "myLabel": { mbean -> mbean.name().getKeyProperty("myObjectNameProperty") }, * "myOtherLabel": { "myLabelValue" } * ], "ThreadCount", otel.&longUpDownCounter) * - * threadCount.update() * * If the underlying MBean(s) held by the MBeanHelper are * {@link CompositeData} instances, each key of their CompositeType's @@ -32,7 +35,7 @@ import javax.management.openmbean.CompositeData * updated for each respective value. */ class InstrumentHelper { - private static final Logger logger = Logger.getLogger(InstrumentHelper.class.getName()); + private static final Logger logger = Logger.getLogger(InstrumentHelper.class.getName()) private final MBeanHelper mBeanHelper private final String instrumentName @@ -41,6 +44,7 @@ class InstrumentHelper { private final Map> mBeanAttributes private final Map labelFuncs private final Closure instrument + private final GroovyMetricEnvironment metricEnvironment /** * An InstrumentHelper provides the ability to easily create and update {@link io.opentelemetry.api.metrics.Instrument} @@ -52,13 +56,15 @@ class InstrumentHelper { * @param description - the resulting instruments' description to register. * @param unit - the resulting instruments' unit to register. * @param labelFuncs - A {@link Map} of label names and values to be determined by custom - * {@link GroovyMBean}-provided Closures: (e.g. [ "myLabelName" : { mbean -> "myLabelValue"} ]). The + * {@link GroovyMBean}-provided Closures: (e.g. [ "myLabelName" : { mbean -> "myLabelValue"} ]). The * resulting Label instances will be used for each individual update. * @param attribute - The {@link GroovyMBean} attribute for which to use as the instrument value. * @param instrument - The {@link io.opentelemetry.api.metrics.Instrument}-producing {@link OtelHelper} method pointer: * (e.g. new OtelHelper().&doubleValueRecorder) + * @param metricenvironment - The {@link GroovyMetricEnvironment} used to register callbacks onto the SDK meter for + * batch callbacks used to handle {@link CompositeData} */ - InstrumentHelper(MBeanHelper mBeanHelper, String instrumentName, String description, String unit, Map> labelFuncs, Map>> MBeanAttributes, Closure instrument) { + InstrumentHelper(MBeanHelper mBeanHelper, String instrumentName, String description, String unit, Map> labelFuncs, Map>> MBeanAttributes, Closure instrument, GroovyMetricEnvironment metricEnvironment) { this.mBeanHelper = mBeanHelper this.instrumentName = instrumentName this.description = description @@ -66,126 +72,185 @@ class InstrumentHelper { this.labelFuncs = labelFuncs this.mBeanAttributes = MBeanAttributes this.instrument = instrument + this.metricEnvironment = metricEnvironment } void update() { - // Tuples of the form (mbean, attribute, value) - def values = mBeanHelper.getAttributes(mBeanAttributes.keySet()) - - // If there are no tuples with non-null value, return early - if (values.find {it.getV3() != null } == null) { - logger.warning("No valid value(s) for ${instrumentName} - ${mBeanHelper}.${mBeanAttributes.keySet().join(",")}") + def mbeans = mBeanHelper.getMBeans() + def compositeAttributes = [] + def simpleAttributes = [] + if (mbeans.size() == 0) { return } - // Observer instruments need to have a single updater set at build time, so pool all - // update operations in a list of closures per instrument to be executed after all values - // are established, potentially as a single updater. This is done because a single MBeanHelper - // can represent multiple MBeans (each with different values for an attribute) and the labelFuncs - // will create multiple datapoints from the same instrument identifiers. - def tupleToUpdates = [:] // tuple is of form (instrument, instrumentName, description, unit) - - values.each { collectedValue -> - def mbean = collectedValue.getV1() - def attribute = collectedValue.getV2() - def value = collectedValue.getV3() - if (value instanceof CompositeData) { - value.getCompositeType().keySet().each { key -> - def val = value.get(key) - def updatedInstrumentName = "${instrumentName}.${key}" - def labels = getLabels(mbean, labelFuncs, mBeanAttributes[attribute]) - def tuple = new Tuple(instrument, updatedInstrumentName, description, unit) - logger.fine("Recording ${updatedInstrumentName} - ${instrument.method} w/ ${val} - ${labels}") - if (!tupleToUpdates.containsKey(tuple)) { - tupleToUpdates[tuple] = [] - } - tupleToUpdates[tuple].add(prepareUpdateClosure(instrument, val, labels)) + mBeanAttributes.keySet().each { attribute -> + try { + // Look at the collected mbeans to evaluate if the attributes requested are + // composite data types or simple. Composite types require different parsing to + // end up with multiple recorders in the same callback. + def keySet = getCompositeKeys(attribute, mbeans) + if (keySet.size() > 0) { + compositeAttributes.add(new Tuple2>(attribute, keySet)) + } else { + simpleAttributes.add(attribute) } - } else if (value != null) { - def labels = getLabels(mbean, labelFuncs, mBeanAttributes[attribute]) - def tuple = new Tuple(instrument, instrumentName, description, unit) - logger.fine("Recording ${instrumentName} - ${instrument.method} w/ ${value} - ${labels}") - if (!tupleToUpdates.containsKey(tuple)) { - tupleToUpdates[tuple] = [] - } - tupleToUpdates[tuple].add(prepareUpdateClosure(instrument, value, labels)) + } catch (AttributeNotFoundException ignored) { + logger.fine("Attribute ${attribute} not found on any of the collected mbeans") + } catch (InvalidAttributeValueException ignored) { + logger.info("Attribute ${attribute} was not consistently CompositeData for " + + "collected mbeans. The metrics gatherer cannot collect measurements for an instrument " + + "when the mbeans attribute values are not all CompositeData or all simple values.") } } - tupleToUpdates.each {tuple, updateClosures -> - def instrument = tuple.getAt(0) - def instrumentName = tuple.getAt(1) - def description = tuple.getAt(2) - def unit = tuple.getAt(3) - + if (simpleAttributes.size() > 0) { + def simpleUpdateClosure = prepareUpdateClosure(mbeans, simpleAttributes) if (instrumentIsDoubleObserver(instrument) || instrumentIsLongObserver(instrument)) { - // Though the instrument updater is only set at build time, - // our GroovyMetricEnvironment helpers ensure the updater - // uses the Closure specified here. instrument(instrumentName, description, unit, { result -> - updateClosures.each { update -> - update(result) - } + simpleUpdateClosure(result) }) } else { - def inst = instrument(instrumentName, description, unit) - updateClosures.each { - it(inst) + simpleUpdateClosure(instrument(instrumentName, description, unit)) + } + } + + if (compositeAttributes.size() > 0) { + registerCompositeUpdateClosures(mbeans, compositeAttributes) + } + } + + // This function retrieves the set of CompositeData keys for the given attribute for the currently + // collected mbeans. If the attribute is all simple values it will return an empty list. + // If the attribute is inconsistent across mbeans, it will throw an exception. + private static Set getCompositeKeys(String attribute, List beans) throws AttributeNotFoundException, InvalidAttributeValueException { + def isComposite = false + def isFound = false + def keySet = beans.collect { bean -> + try { + def value = MBeanHelper.getBeanAttribute(bean, attribute) + if (value == null) { + // Null represents an attribute not found exception in MBeanHelper + [] + } else if (value instanceof CompositeData) { + // If we've found a simple attribute, throw an exception as this attribute + // was mixed between simple & composite + if (!isComposite && isFound) { + throw new InvalidAttributeValueException() + } + isComposite = true + isFound = true + value.getCompositeType().keySet() + } else { + // If we've previously found a composite attribute, throw an exception as this attribute + // was mixed between simple & composite + if (isComposite) { + throw new InvalidAttributeValueException() + } + isFound = true + [] } + } catch (AttributeNotFoundException | NullPointerException ignored) { + [] } + }.flatten() + .toSet() + + if (!isFound) { + throw new AttributeNotFoundException() } + + return keySet } private static Map getLabels(GroovyMBean mbean, Map labelFuncs, Map additionalLabels) { def labels = [:] labelFuncs.each { label, labelFunc -> - labels[label] = labelFunc(mbean) as String + labels[label] = labelFunc(mbean) as String } - additionalLabels.each {label, labelFunc -> + additionalLabels.each { label, labelFunc -> labels[label] = labelFunc(mbean) as String } return labels } - private static Closure prepareUpdateClosure(inst, value, labels) { - def labelMap = GroovyMetricEnvironment.mapToAttributes(labels) - if (instrumentIsLongObserver(inst)) { - return { result -> - result.record((long) value, labelMap) + // Create a closure for simple attributes that will retrieve mbean information on + // callback to ensure that metrics are collected on request + private Closure prepareUpdateClosure(List mbeans, attributes) { + return { result -> + [mbeans, attributes].combinations().each { pair -> + def (mbean, attribute) = pair + def value = MBeanHelper.getBeanAttribute(mbean, attribute) + if (value != null) { + def labels = getLabels(mbean, labelFuncs, mBeanAttributes[attribute]) + logger.fine("Recording ${instrumentName} - ${instrument.method} w/ ${value} - ${labels}") + recordDataPoint(instrument, result, value, GroovyMetricEnvironment.mapToAttributes(labels)) + } } + } + } + + // Create a closure for composite data attributes that will retrieve mbean information + // on callback to ensure that metrics are collected on request. This will create a single + // batch callback for all of the metrics collected on a single attribute. + private void registerCompositeUpdateClosures(List mbeans, attributes) { + attributes.each { pair -> + def (attribute, keys) = pair + def instruments = keys.collect { new Tuple2(it, instrument("${instrumentName}.${it}", description, unit, null)) } + + metricEnvironment.registerBatchCallback("${instrumentName}.${attribute}", () -> { + mbeans.each { mbean -> + def value = MBeanHelper.getBeanAttribute(mbean, attribute) + if (value != null && value instanceof CompositeData) { + instruments.each { inst -> + def val = value.get(inst.v1) + def labels = getLabels(mbean, labelFuncs, mBeanAttributes[attribute]) + logger.fine("Recording ${"${instrumentName}.${inst.v1}"} - ${instrument.method} w/ ${val} - ${labels}") + recordDataPoint(instrument, inst.v2, val, GroovyMetricEnvironment.mapToAttributes(labels)) + } + } + } + }, instruments.first().v2, *instruments.tail().collect { it.v2 }) + } + } + + // Based on the type of instrument, record the data point in the way expected by the observable + private static void recordDataPoint(inst, result, value, labelMap) { + if (instrumentIsLongObserver(inst)) { + result.record((long) value, labelMap) } else if (instrumentIsDoubleObserver(inst)) { - return { result -> - result.record((double) value, labelMap) - } + result.record((double) value, labelMap) } else if (instrumentIsCounter(inst)) { - return { i -> i.add(value, labelMap) } + result.add(value, labelMap) } else { - return { i -> i.record(value, labelMap) } + result.record(value, labelMap) } } - @PackageScope static boolean instrumentIsDoubleObserver(inst) { + @PackageScope + static boolean instrumentIsDoubleObserver(inst) { return [ - "doubleCounterCallback", - "doubleUpDownCounterCallback", - "doubleValueCallback", + "doubleCounterCallback", + "doubleUpDownCounterCallback", + "doubleValueCallback", ].contains(inst.method) } - @PackageScope static boolean instrumentIsLongObserver(inst) { + @PackageScope + static boolean instrumentIsLongObserver(inst) { return [ - "longCounterCallback", - "longUpDownCounterCallback", - "longValueCallback", + "longCounterCallback", + "longUpDownCounterCallback", + "longValueCallback", ].contains(inst.method) } - @PackageScope static boolean instrumentIsCounter(inst) { + @PackageScope + static boolean instrumentIsCounter(inst) { return [ - "doubleCounter", - "doubleUpDownCounter", - "longCounter", - "longUpDownCounter" + "doubleCounter", + "doubleUpDownCounter", + "longCounter", + "longUpDownCounter" ].contains(inst.method) } } diff --git a/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/MBeanHelper.groovy b/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/MBeanHelper.groovy index 42fa92eca..1ba98467e 100644 --- a/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/MBeanHelper.groovy +++ b/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/MBeanHelper.groovy @@ -74,11 +74,11 @@ class MBeanHelper { } @PackageScope List getMBeans() { - if (mbeans == null) { + if (mbeans == null || mbeans.size() == 0) { logger.warning("No active MBeans. Be sure to fetch() before updating any applicable instruments.") return [] } - return mbeans + return isSingle ? [mbeans[0]]: mbeans } @PackageScope List getAttribute(String attribute) { @@ -88,12 +88,7 @@ class MBeanHelper { def ofInterest = isSingle ? [mbeans[0]]: mbeans return ofInterest.collect { - try { - it.getProperty(attribute) - } catch (AttributeNotFoundException e) { - logger.warning("Expected attribute ${attribute} not found in mbean ${it.name()}") - null - } + getBeanAttribute(it, attribute) } } @@ -105,12 +100,16 @@ class MBeanHelper { def ofInterest = isSingle ? [mbeans[0]]: mbeans return [ofInterest, attributes].combinations().collect { pair -> def (bean, attribute) = pair - try { - new Tuple3(bean, attribute, bean.getProperty(attribute)) - } catch (AttributeNotFoundException e) { - logger.info("Expected attribute ${attribute} not found in mbean ${bean.name()}") - new Tuple3(bean, attribute, null) - } + new Tuple3(bean, attribute, getBeanAttribute(bean, attribute)) + } + } + + static Object getBeanAttribute(GroovyMBean bean, String attribute) { + try { + bean.getProperty(attribute) + } catch (AttributeNotFoundException e) { + logger.warning("Expected attribute ${attribute} not found in mbean ${bean.name()}") + null } } } diff --git a/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/OtelHelper.groovy b/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/OtelHelper.groovy index a2233073b..e3ab6f3a5 100644 --- a/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/OtelHelper.groovy +++ b/jmx-metrics/src/main/groovy/io/opentelemetry/contrib/jmxmetrics/OtelHelper.groovy @@ -88,13 +88,13 @@ class OtelHelper { * attribute value(s). The parameters map to the InstrumentHelper constructor. */ InstrumentHelper instrument(MBeanHelper mBeanHelper, String instrumentName, String description, String unit, Map labelFuncs, Map> attributes, Closure otelInstrument) { - def instrumentHelper = new InstrumentHelper(mBeanHelper, instrumentName, description, unit, labelFuncs, attributes, otelInstrument) + def instrumentHelper = new InstrumentHelper(mBeanHelper, instrumentName, description, unit, labelFuncs, attributes, otelInstrument, groovyMetricEnvironment) instrumentHelper.update() return instrumentHelper } InstrumentHelper instrument(MBeanHelper mBeanHelper, String instrumentName, String description, String unit, Map labelFuncs, String attribute, Closure otelInstrument) { - instrument(mBeanHelper, instrumentName, description, unit, labelFuncs, [(attribute): [:] as Map], otelInstrument) + return instrument(mBeanHelper, instrumentName, description, unit, labelFuncs, [(attribute): [:] as Map], otelInstrument) } InstrumentHelper instrument(MBeanHelper mBeanHelper, String instrumentName, String description, String unit, String attribute, Closure otelInstrument) { @@ -102,7 +102,7 @@ class OtelHelper { } InstrumentHelper instrument(MBeanHelper mBeanHelper, String instrumentName, String description, String unit, Map> attributes, Closure otelInstrument) { - return instrument(mBeanHelper, instrumentName, description, unit, [:] as Map, attributes, otelInstrument) + return instrument(mBeanHelper, instrumentName, description, unit, [:] as Map, attributes, otelInstrument) } InstrumentHelper instrument(MBeanHelper mBeanHelper, String instrumentName, String description, String attribute, Closure otelInstrument) { @@ -110,7 +110,7 @@ class OtelHelper { } InstrumentHelper instrument(MBeanHelper mBeanHelper, String instrumentName, String description, Map> attributes, Closure otelInstrument) { - return instrument(mBeanHelper, instrumentName, description, OtelHelper.SCALAR, [:] as Map, attributes, otelInstrument) + return instrument(mBeanHelper, instrumentName, description, OtelHelper.SCALAR, [:] as Map, attributes, otelInstrument) } InstrumentHelper instrument(MBeanHelper mBeanHelper, String instrumentName, String attribute, Closure otelInstrument) { @@ -118,7 +118,7 @@ class OtelHelper { } InstrumentHelper instrument(MBeanHelper mBeanHelper, String instrumentName, Map> attributes, Closure otelInstrument) { - return instrument(mBeanHelper, instrumentName, "", OtelHelper.SCALAR, [:] as Map, attributes, otelInstrument) + return instrument(mBeanHelper, instrumentName, "", OtelHelper.SCALAR, [:] as Map, attributes, otelInstrument) } DoubleCounter doubleCounter(String name, String description, String unit) { @@ -193,75 +193,75 @@ class OtelHelper { return longHistogram(name, '') } - void doubleCounterCallback(String name, String description, String unit, Consumer updater) { - groovyMetricEnvironment.registerDoubleCounterCallback(name, description, unit, updater) + ObservableDoubleMeasurement doubleCounterCallback(String name, String description, String unit, Consumer updater) { + return groovyMetricEnvironment.registerDoubleCounterCallback(name, description, unit, updater) } - void doubleCounterCallback(String name, String description, Consumer updater) { - doubleCounterCallback(name, description, SCALAR, updater) + ObservableDoubleMeasurement doubleCounterCallback(String name, String description, Consumer updater) { + return doubleCounterCallback(name, description, SCALAR, updater) } - void doubleCounterCallback(String name, Consumer updater) { - doubleCounterCallback(name, '', updater) + ObservableDoubleMeasurement doubleCounterCallback(String name, Consumer updater) { + return doubleCounterCallback(name, '', updater) } - void longCounterCallback(String name, String description, String unit, Consumer updater) { - groovyMetricEnvironment.registerLongCounterCallback(name, description, unit, updater) + ObservableLongMeasurement longCounterCallback(String name, String description, String unit, Consumer updater) { + return groovyMetricEnvironment.registerLongCounterCallback(name, description, unit, updater) } - void longCounterCallback(String name, String description, Consumer updater) { - longCounterCallback(name, description, SCALAR, updater) + ObservableLongMeasurement longCounterCallback(String name, String description, Consumer updater) { + return longCounterCallback(name, description, SCALAR, updater) } - void longCounterCallback(String name, Consumer updater) { - longCounterCallback(name, '', updater) + ObservableLongMeasurement longCounterCallback(String name, Consumer updater) { + return longCounterCallback(name, '', updater) } - void doubleUpDownCounterCallback(String name, String description, String unit, Consumer updater) { - groovyMetricEnvironment.registerDoubleUpDownCounterCallback(name, description, unit, updater) + ObservableDoubleMeasurement doubleUpDownCounterCallback(String name, String description, String unit, Consumer updater) { + return groovyMetricEnvironment.registerDoubleUpDownCounterCallback(name, description, unit, updater) } - void doubleUpDownCounterCallback(String name, String description, Consumer updater) { - doubleUpDownCounterCallback(name, description, SCALAR, updater) + ObservableDoubleMeasurement doubleUpDownCounterCallback(String name, String description, Consumer updater) { + return doubleUpDownCounterCallback(name, description, SCALAR, updater) } - void doubleUpDownCounterCallback(String name, Consumer updater) { - doubleUpDownCounterCallback(name, '', updater) + ObservableDoubleMeasurement doubleUpDownCounterCallback(String name, Consumer updater) { + return doubleUpDownCounterCallback(name, '', updater) } - void longUpDownCounterCallback(String name, String description, String unit, Consumer updater) { - groovyMetricEnvironment.registerLongUpDownCounterCallback(name, description, unit, updater) + ObservableLongMeasurement longUpDownCounterCallback(String name, String description, String unit, Consumer updater) { + return groovyMetricEnvironment.registerLongUpDownCounterCallback(name, description, unit, updater) } - void longUpDownCounterCallback(String name, String description, Consumer updater) { - longUpDownCounterCallback(name, description, SCALAR, updater) + ObservableLongMeasurement longUpDownCounterCallback(String name, String description, Consumer updater) { + return longUpDownCounterCallback(name, description, SCALAR, updater) } - void longUpDownCounterCallback(String name, Consumer updater) { - longUpDownCounterCallback(name, '', updater) + ObservableLongMeasurement longUpDownCounterCallback(String name, Consumer updater) { + return longUpDownCounterCallback(name, '', updater) } - void doubleValueCallback(String name, String description, String unit, Consumer updater) { - groovyMetricEnvironment.registerDoubleValueCallback(name, description, unit, updater) + ObservableDoubleMeasurement doubleValueCallback(String name, String description, String unit, Consumer updater) { + return groovyMetricEnvironment.registerDoubleValueCallback(name, description, unit, updater) } - void doubleValueCallback(String name, String description, Consumer updater) { - doubleValueCallback(name, description, SCALAR, updater) + ObservableDoubleMeasurement doubleValueCallback(String name, String description, Consumer updater) { + return doubleValueCallback(name, description, SCALAR, updater) } - void doubleValueCallback(String name, Consumer updater) { - doubleValueCallback(name, '', updater) + ObservableDoubleMeasurement doubleValueCallback(String name, Consumer updater) { + return doubleValueCallback(name, '', updater) } - void longValueCallback(String name, String description, String unit, Consumer updater) { - groovyMetricEnvironment.registerLongValueCallback(name, description, unit, updater) + ObservableLongMeasurement longValueCallback(String name, String description, String unit, Consumer updater) { + return groovyMetricEnvironment.registerLongValueCallback(name, description, unit, updater) } - void longValueCallback(String name, String description, Consumer updater) { - longValueCallback(name, description, SCALAR, updater) + ObservableLongMeasurement longValueCallback(String name, String description, Consumer updater) { + return longValueCallback(name, description, SCALAR, updater) } - void longValueCallback(String name, Consumer updater) { - longValueCallback(name, '', updater) + ObservableLongMeasurement longValueCallback(String name, Consumer updater) { + return longValueCallback(name, '', updater) } } diff --git a/jmx-metrics/src/test/java/io/opentelemetry/contrib/jmxmetrics/InstrumenterHelperTest.java b/jmx-metrics/src/test/java/io/opentelemetry/contrib/jmxmetrics/InstrumenterHelperTest.java index a9abdbef1..11737efb3 100644 --- a/jmx-metrics/src/test/java/io/opentelemetry/contrib/jmxmetrics/InstrumenterHelperTest.java +++ b/jmx-metrics/src/test/java/io/opentelemetry/contrib/jmxmetrics/InstrumenterHelperTest.java @@ -58,6 +58,7 @@ class InstrumenterHelperTest { // Will eventually be replaced with Jupiter extension in sdk-testing private SdkMeterProvider meterProvider; private InMemoryMetricReader metricReader; + private GroovyMetricEnvironment metricEnvironment; private OtelHelper otel; @@ -91,8 +92,8 @@ void confirmServerIsActive() { void setupOtel() { metricReader = InMemoryMetricReader.create(); meterProvider = SdkMeterProvider.builder().registerMetricReader(metricReader).build(); - - otel = new OtelHelper(jmxClient, new GroovyMetricEnvironment(meterProvider, "otel.test")); + metricEnvironment = new GroovyMetricEnvironment(meterProvider, "otel.test"); + otel = new OtelHelper(jmxClient, metricEnvironment); } @AfterEach @@ -688,7 +689,8 @@ void updateWithHelper( "1", labelFuncs, Collections.singletonMap(attribute, null), - instrument); + instrument, + metricEnvironment); instrumentHelper.update(); } @@ -702,7 +704,14 @@ void updateWithHelperMultiAttribute( Map> labelFuncs = new HashMap<>(); InstrumentHelper instrumentHelper = new InstrumentHelper( - mBeanHelper, instrumentName, description, "1", labelFuncs, attributes, instrument); + mBeanHelper, + instrumentName, + description, + "1", + labelFuncs, + attributes, + instrument, + metricEnvironment); instrumentHelper.update(); }