Skip to content

Commit

Permalink
Add support for setting unit of RuntimeMetric
Browse files Browse the repository at this point in the history
  • Loading branch information
zacw7 committed May 17, 2022
1 parent 41460f9 commit f7ced3f
Show file tree
Hide file tree
Showing 21 changed files with 273 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class RuntimeMetric
{
private final String name;
private final RuntimeUnit unit;
private final AtomicLong sum = new AtomicLong();
private final AtomicLong count = new AtomicLong();
private final AtomicLong max = new AtomicLong(Long.MIN_VALUE);
Expand All @@ -39,28 +40,31 @@ public class RuntimeMetric
* Creates a new empty RuntimeMetric.
*
* @param name Name of this metric. If used in the presto core code base, this should be a value defined in {@link RuntimeMetricName}. But connectors could use arbitrary names.
* @param unit Unit of this metric. Available units are defined in {@link RuntimeUnit}.
*/
public RuntimeMetric(String name)
public RuntimeMetric(String name, RuntimeUnit unit)
{
this.name = requireNonNull(name, "name is null");
this.unit = requireNonNull(unit, "unit is null");
}

public static RuntimeMetric copyOf(RuntimeMetric metric)
{
requireNonNull(metric, "metric is null");
return new RuntimeMetric(metric.getName(), metric.getSum(), metric.getCount(), metric.getMax(), metric.getMin());
return new RuntimeMetric(metric.getName(), metric.getUnit(), metric.getSum(), metric.getCount(), metric.getMax(), metric.getMin());
}

@JsonCreator
@ThriftConstructor
public RuntimeMetric(
@JsonProperty("name") String name,
@JsonProperty("unit") RuntimeUnit unit,
@JsonProperty("sum") long sum,
@JsonProperty("count") long count,
@JsonProperty("max") long max,
@JsonProperty("min") long min)
{
this(name);
this(name, unit);
set(sum, count, max, min);
}

Expand All @@ -75,6 +79,7 @@ private void set(long sum, long count, long max, long min)
public void set(RuntimeMetric metric)
{
requireNonNull(metric, "metric is null");
checkState(unit == metric.getUnit(), "The metric must have the same unit type as the current one.");
set(metric.getSum(), metric.getCount(), metric.getMax(), metric.getMin());
}

Expand Down Expand Up @@ -104,6 +109,8 @@ public static RuntimeMetric merge(RuntimeMetric metric1, RuntimeMetric metric2)
if (metric2 == null) {
return metric1;
}
checkState(metric1.getUnit() == metric2.getUnit(), "Two metrics to be merged must have the same unit type.");

RuntimeMetric mergedMetric = copyOf(metric1);
mergedMetric.mergeWith(metric2);
return mergedMetric;
Expand All @@ -117,6 +124,7 @@ public void mergeWith(RuntimeMetric metric)
if (metric == null) {
return;
}
checkState(unit == metric.getUnit(), "The metric to be merged must have the same unit type as the current one.");
sum.addAndGet(metric.getSum());
count.addAndGet(metric.getCount());
max.accumulateAndGet(metric.getMax(), Math::max);
Expand Down Expand Up @@ -150,4 +158,18 @@ public long getMin()
{
return min.get();
}

@JsonProperty
@ThriftField(6)
public RuntimeUnit getUnit()
{
return unit;
}

private static void checkState(boolean condition, String message)
{
if (!condition) {
throw new IllegalStateException(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

import static com.facebook.presto.common.RuntimeUnit.NANO;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -44,7 +45,7 @@ public RuntimeStats()
public RuntimeStats(Map<String, RuntimeMetric> metrics)
{
requireNonNull(metrics, "metrics is null");
metrics.forEach((name, newMetric) -> this.metrics.computeIfAbsent(name, RuntimeMetric::new).mergeWith(newMetric));
metrics.forEach((name, newMetric) -> this.metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, newMetric.getUnit())).mergeWith(newMetric));
}

public static RuntimeStats copyOf(RuntimeStats stats)
Expand Down Expand Up @@ -85,25 +86,25 @@ public Map<String, RuntimeMetric> getMetrics()
return Collections.unmodifiableMap(metrics);
}

public void addMetricValue(String name, long value)
public void addMetricValue(String name, RuntimeUnit unit, long value)
{
metrics.computeIfAbsent(name, RuntimeMetric::new).addValue(value);
metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, unit)).addValue(value);
}

public void addMetricValueIgnoreZero(String name, long value)
public void addMetricValueIgnoreZero(String name, RuntimeUnit unit, long value)
{
if (value == 0) {
return;
}
addMetricValue(name, value);
addMetricValue(name, unit, value);
}

/**
* Merges {@code metric} into this object with name {@code name}.
*/
public void mergeMetric(String name, RuntimeMetric metric)
{
metrics.computeIfAbsent(name, RuntimeMetric::new).mergeWith(metric);
metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, metric.getUnit())).mergeWith(metric);
}

/**
Expand All @@ -114,7 +115,7 @@ public void mergeWith(RuntimeStats stats)
if (stats == null) {
return;
}
stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, RuntimeMetric::new).mergeWith(newMetric));
stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, newMetric.getUnit())).mergeWith(newMetric));
}

/**
Expand All @@ -126,14 +127,14 @@ public void update(RuntimeStats stats)
if (stats == null) {
return;
}
stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, RuntimeMetric::new).set(newMetric));
stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, newMetric.getUnit())).set(newMetric));
}

public <V> V profileNanos(String tag, Supplier<V> supplier)
{
long startTime = System.nanoTime();
V result = supplier.get();
addMetricValueIgnoreZero(tag, System.nanoTime() - startTime);
addMetricValueIgnoreZero(tag, NANO, System.nanoTime() - startTime);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.common;

import com.facebook.drift.annotations.ThriftEnum;
import com.facebook.drift.annotations.ThriftEnumValue;

@ThriftEnum
public enum RuntimeUnit
{
NONE(0), NANO(1), BYTE(2);

private final int value;

RuntimeUnit(int value)
{
this.value = value;
}

@ThriftEnumValue
public int getValue()
{
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import com.facebook.airlift.json.JsonCodec;
import org.testng.annotations.Test;

import static com.facebook.presto.common.RuntimeUnit.BYTE;
import static com.facebook.presto.common.RuntimeUnit.NANO;
import static com.facebook.presto.common.RuntimeUnit.NONE;
import static org.testng.Assert.assertEquals;

public class TestRuntimeMetric
Expand All @@ -25,6 +28,7 @@ public class TestRuntimeMetric
private void assertRuntimeMetricEquals(RuntimeMetric m1, RuntimeMetric m2)
{
assertEquals(m1.getName(), m2.getName());
assertEquals(m1.getUnit(), m2.getUnit());
assertEquals(m1.getSum(), m2.getSum());
assertEquals(m1.getCount(), m2.getCount());
assertEquals(m1.getMax(), m2.getMax());
Expand All @@ -34,63 +38,82 @@ private void assertRuntimeMetricEquals(RuntimeMetric m1, RuntimeMetric m2)
@Test
public void testAddValue()
{
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME);
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NONE);
metric.addValue(101);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 101, 1, 101, 101));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 101, 1, 101, 101));

metric.addValue(99);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 200, 2, 101, 99));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 200, 2, 101, 99));

metric.addValue(201);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 401, 3, 201, 99));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 401, 3, 201, 99));

metric.addValue(202);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 603, 4, 202, 99));
}

@Test
public void testCopy()
{
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, 1, 1, 1, 1);
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NONE, 1, 1, 1, 1);
RuntimeMetric metricCopy = RuntimeMetric.copyOf(metric);

// Verify that updating one metric doesn't affect its copy.
metric.addValue(2);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 3, 2, 2, 1));
assertRuntimeMetricEquals(metricCopy, new RuntimeMetric(TEST_METRIC_NAME, 1, 1, 1, 1));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 3, 2, 2, 1));
assertRuntimeMetricEquals(metricCopy, new RuntimeMetric(TEST_METRIC_NAME, NONE, 1, 1, 1, 1));
metricCopy.addValue(2);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 3, 2, 2, 1));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 3, 2, 2, 1));
assertRuntimeMetricEquals(metricCopy, metric);
}

@Test
public void testMergeWith()
{
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, 20, 2, 11, 9);
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 20, 2, 11, 9);
metric1.mergeWith(metric2);
assertRuntimeMetricEquals(metric1, new RuntimeMetric(TEST_METRIC_NAME, 25, 4, 11, 1));
assertRuntimeMetricEquals(metric1, new RuntimeMetric(TEST_METRIC_NAME, NONE, 25, 4, 11, 1));

metric2.mergeWith(metric2);
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, 40, 4, 11, 9));
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, NONE, 40, 4, 11, 9));

metric2.mergeWith(null);
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, 40, 4, 11, 9));
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, NONE, 40, 4, 11, 9));
}

@Test(expectedExceptions = {IllegalStateException.class})
public void testMergeWithWithConflictUnits()
{
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NANO, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, BYTE, 20, 2, 11, 9);
metric1.mergeWith(metric2);
}

@Test
public void testMerge()
{
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, 20, 2, 11, 9);
assertRuntimeMetricEquals(RuntimeMetric.merge(metric1, metric2), new RuntimeMetric(TEST_METRIC_NAME, 25, 4, 11, 1));
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 20, 2, 11, 9);
assertRuntimeMetricEquals(RuntimeMetric.merge(metric1, metric2), new RuntimeMetric(TEST_METRIC_NAME, NONE, 25, 4, 11, 1));

assertRuntimeMetricEquals(metric1, new RuntimeMetric(TEST_METRIC_NAME, NONE, 5, 2, 4, 1));
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, NONE, 20, 2, 11, 9));
}

assertRuntimeMetricEquals(metric1, new RuntimeMetric(TEST_METRIC_NAME, 5, 2, 4, 1));
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, 20, 2, 11, 9));
@Test(expectedExceptions = {IllegalStateException.class})
public void testMergeWithConflictUnits()
{
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NANO, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, BYTE, 20, 2, 11, 9);
RuntimeMetric.merge(metric1, metric2);
}

@Test
public void testJson()
{
JsonCodec<RuntimeMetric> codec = JsonCodec.jsonCodec(RuntimeMetric.class);
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME);
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NANO);
metric.addValue(101);
metric.addValue(202);

Expand Down
Loading

0 comments on commit f7ced3f

Please sign in to comment.