Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit to RuntimeMetric #17737

Merged
merged 1 commit into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class RuntimeMetric
{
private final String name;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the name here since it's duplicate information?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kinda cautious to remove the existing field from the protocol. Some components might rely on it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the Velox definition doesn't have it, so to make it more consistent, can we at least check what components are relying on it and would it be easy to remove?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid this is another breaking change. The main reason we choose to keep the map structure instead of converting into array is that we cannot remove the existing information.

private final RuntimeUnit unit;
private final AtomicLong sum = new AtomicLong();
private final AtomicLong count = new AtomicLong();
private final AtomicLong max = new AtomicLong(Long.MIN_VALUE);
Expand All @@ -39,28 +40,31 @@ public class RuntimeMetric
* Creates a new empty RuntimeMetric.
*
* @param name Name of this metric. If used in the presto core code base, this should be a value defined in {@link RuntimeMetricName}. But connectors could use arbitrary names.
* @param unit Unit of this metric. Available units are defined in {@link RuntimeUnit}.
*/
public RuntimeMetric(String name)
public RuntimeMetric(String name, RuntimeUnit unit)
{
this.name = requireNonNull(name, "name is null");
this.unit = requireNonNull(unit, "unit is null");
}

public static RuntimeMetric copyOf(RuntimeMetric metric)
{
requireNonNull(metric, "metric is null");
return new RuntimeMetric(metric.getName(), metric.getSum(), metric.getCount(), metric.getMax(), metric.getMin());
return new RuntimeMetric(metric.getName(), metric.getUnit(), metric.getSum(), metric.getCount(), metric.getMax(), metric.getMin());
}

@JsonCreator
@ThriftConstructor
public RuntimeMetric(
@JsonProperty("name") String name,
@JsonProperty("unit") RuntimeUnit unit,
@JsonProperty("sum") long sum,
@JsonProperty("count") long count,
@JsonProperty("max") long max,
@JsonProperty("min") long min)
{
this(name);
this(name, unit);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the older version would pass in null as the unit, would it throw exceptions because of the requireNonNull check in construction function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be checked inside this(name, unit).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and it throws exception, so wouldn't the new version of presto that has this change be incompatible with older version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got your point. I've tested it, it works fine using the old presto cli.

set(sum, count, max, min);
}

Expand All @@ -75,6 +79,7 @@ private void set(long sum, long count, long max, long min)
public void set(RuntimeMetric metric)
{
requireNonNull(metric, "metric is null");
checkState(unit == metric.getUnit(), "The metric must have the same unit type as the current one.");
set(metric.getSum(), metric.getCount(), metric.getMax(), metric.getMin());
}

Expand Down Expand Up @@ -104,6 +109,8 @@ public static RuntimeMetric merge(RuntimeMetric metric1, RuntimeMetric metric2)
if (metric2 == null) {
return metric1;
}
checkState(metric1.getUnit() == metric2.getUnit(), "Two metrics to be merged must have the same unit type.");

RuntimeMetric mergedMetric = copyOf(metric1);
mergedMetric.mergeWith(metric2);
return mergedMetric;
Expand All @@ -117,6 +124,7 @@ public void mergeWith(RuntimeMetric metric)
if (metric == null) {
return;
}
checkState(unit == metric.getUnit(), "The metric to be merged must have the same unit type as the current one.");
sum.addAndGet(metric.getSum());
count.addAndGet(metric.getCount());
max.accumulateAndGet(metric.getMax(), Math::max);
Expand Down Expand Up @@ -150,4 +158,18 @@ public long getMin()
{
return min.get();
}

@JsonProperty
@ThriftField(6)
public RuntimeUnit getUnit()
{
return unit;
}

private static void checkState(boolean condition, String message)
{
if (!condition) {
throw new IllegalStateException(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

import static com.facebook.presto.common.RuntimeUnit.NANO;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -44,7 +45,7 @@ public RuntimeStats()
public RuntimeStats(Map<String, RuntimeMetric> metrics)
{
requireNonNull(metrics, "metrics is null");
metrics.forEach((name, newMetric) -> this.metrics.computeIfAbsent(name, RuntimeMetric::new).mergeWith(newMetric));
metrics.forEach((name, newMetric) -> this.metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, newMetric.getUnit())).mergeWith(newMetric));
}

public static RuntimeStats copyOf(RuntimeStats stats)
Expand Down Expand Up @@ -85,25 +86,25 @@ public Map<String, RuntimeMetric> getMetrics()
return Collections.unmodifiableMap(metrics);
}

public void addMetricValue(String name, long value)
public void addMetricValue(String name, RuntimeUnit unit, long value)
{
metrics.computeIfAbsent(name, RuntimeMetric::new).addValue(value);
metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, unit)).addValue(value);
}

public void addMetricValueIgnoreZero(String name, long value)
public void addMetricValueIgnoreZero(String name, RuntimeUnit unit, long value)
{
if (value == 0) {
return;
}
addMetricValue(name, value);
addMetricValue(name, unit, value);
}

/**
* Merges {@code metric} into this object with name {@code name}.
*/
public void mergeMetric(String name, RuntimeMetric metric)
{
metrics.computeIfAbsent(name, RuntimeMetric::new).mergeWith(metric);
metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, metric.getUnit())).mergeWith(metric);
}

/**
Expand All @@ -114,7 +115,7 @@ public void mergeWith(RuntimeStats stats)
if (stats == null) {
return;
}
stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, RuntimeMetric::new).mergeWith(newMetric));
stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, newMetric.getUnit())).mergeWith(newMetric));
}

/**
Expand All @@ -126,14 +127,14 @@ public void update(RuntimeStats stats)
if (stats == null) {
return;
}
stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, RuntimeMetric::new).set(newMetric));
stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, newMetric.getUnit())).set(newMetric));
}

public <V> V profileNanos(String tag, Supplier<V> supplier)
{
long startTime = System.nanoTime();
V result = supplier.get();
addMetricValueIgnoreZero(tag, System.nanoTime() - startTime);
addMetricValueIgnoreZero(tag, NANO, System.nanoTime() - startTime);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.common;

import com.facebook.drift.annotations.ThriftEnum;
import com.facebook.drift.annotations.ThriftEnumValue;

@ThriftEnum
public enum RuntimeUnit
{
NONE(0), NANO(1), BYTE(2);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can we have one more TIME to represent how many times
  2. Can we remove NONE, why do we need it?

Copy link
Member Author

@zacw7 zacw7 May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have one more TIME to represent how many times

You mean use TIME for the counting metrics like driverCountPerTask or fragmentResultCacheHitCount? I think it might be a little be confusing. TIME looks like a time unit - like seconds/minutes. Also the current setup is aligned with Velox. I think it's good for start.

Can we remove NONE, why do we need it?

I would suggest keeping it:

  1. For compatibility reasons, we are introducing this unit to runtime metrics and there are many metrics haven't been set up with units yet, we would like those metrics to have a default fall-back value NONE.
  2. For some metrics like optimizedWithMaterializedView ( 1 means the query is rewritten to read from available materialized views to improve the performance), it does make much sense to set a unit to them, so let's keep it as NONE.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, can we

  1. Link the Velox file in the description as well since those two have to be compatible with each other
  2. Can we add an issue or task as follow up to slowly deprecate the NONE and adopt meaningful units
  3. As for TIME, I understand the confusion (I would add a comment to avoid confusion), but also hard to think of any other names :) , feel free to change to a name more appropriate, let's include this unit as an option in the follow up task/issue as well


private final int value;

RuntimeUnit(int value)
{
this.value = value;
}

@ThriftEnumValue
public int getValue()
{
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import com.facebook.airlift.json.JsonCodec;
import org.testng.annotations.Test;

import static com.facebook.presto.common.RuntimeUnit.BYTE;
import static com.facebook.presto.common.RuntimeUnit.NANO;
import static com.facebook.presto.common.RuntimeUnit.NONE;
import static org.testng.Assert.assertEquals;

public class TestRuntimeMetric
Expand All @@ -25,6 +28,7 @@ public class TestRuntimeMetric
private void assertRuntimeMetricEquals(RuntimeMetric m1, RuntimeMetric m2)
{
assertEquals(m1.getName(), m2.getName());
assertEquals(m1.getUnit(), m2.getUnit());
assertEquals(m1.getSum(), m2.getSum());
assertEquals(m1.getCount(), m2.getCount());
assertEquals(m1.getMax(), m2.getMax());
Expand All @@ -34,63 +38,82 @@ private void assertRuntimeMetricEquals(RuntimeMetric m1, RuntimeMetric m2)
@Test
public void testAddValue()
{
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME);
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NONE);
metric.addValue(101);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 101, 1, 101, 101));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 101, 1, 101, 101));

metric.addValue(99);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 200, 2, 101, 99));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 200, 2, 101, 99));

metric.addValue(201);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 401, 3, 201, 99));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 401, 3, 201, 99));

metric.addValue(202);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 603, 4, 202, 99));
}

@Test
public void testCopy()
{
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, 1, 1, 1, 1);
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NONE, 1, 1, 1, 1);
RuntimeMetric metricCopy = RuntimeMetric.copyOf(metric);

// Verify that updating one metric doesn't affect its copy.
metric.addValue(2);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 3, 2, 2, 1));
assertRuntimeMetricEquals(metricCopy, new RuntimeMetric(TEST_METRIC_NAME, 1, 1, 1, 1));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 3, 2, 2, 1));
assertRuntimeMetricEquals(metricCopy, new RuntimeMetric(TEST_METRIC_NAME, NONE, 1, 1, 1, 1));
metricCopy.addValue(2);
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, 3, 2, 2, 1));
assertRuntimeMetricEquals(metric, new RuntimeMetric(TEST_METRIC_NAME, NONE, 3, 2, 2, 1));
assertRuntimeMetricEquals(metricCopy, metric);
}

@Test
public void testMergeWith()
{
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, 20, 2, 11, 9);
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 20, 2, 11, 9);
metric1.mergeWith(metric2);
assertRuntimeMetricEquals(metric1, new RuntimeMetric(TEST_METRIC_NAME, 25, 4, 11, 1));
assertRuntimeMetricEquals(metric1, new RuntimeMetric(TEST_METRIC_NAME, NONE, 25, 4, 11, 1));

metric2.mergeWith(metric2);
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, 40, 4, 11, 9));
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, NONE, 40, 4, 11, 9));

metric2.mergeWith(null);
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, 40, 4, 11, 9));
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, NONE, 40, 4, 11, 9));
}

@Test(expectedExceptions = {IllegalStateException.class})
public void testMergeWithWithConflictUnits()
{
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NANO, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, BYTE, 20, 2, 11, 9);
metric1.mergeWith(metric2);
}

@Test
public void testMerge()
{
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, 20, 2, 11, 9);
assertRuntimeMetricEquals(RuntimeMetric.merge(metric1, metric2), new RuntimeMetric(TEST_METRIC_NAME, 25, 4, 11, 1));
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 20, 2, 11, 9);
assertRuntimeMetricEquals(RuntimeMetric.merge(metric1, metric2), new RuntimeMetric(TEST_METRIC_NAME, NONE, 25, 4, 11, 1));

assertRuntimeMetricEquals(metric1, new RuntimeMetric(TEST_METRIC_NAME, NONE, 5, 2, 4, 1));
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, NONE, 20, 2, 11, 9));
}

assertRuntimeMetricEquals(metric1, new RuntimeMetric(TEST_METRIC_NAME, 5, 2, 4, 1));
assertRuntimeMetricEquals(metric2, new RuntimeMetric(TEST_METRIC_NAME, 20, 2, 11, 9));
@Test(expectedExceptions = {IllegalStateException.class})
public void testMergeWithConflictUnits()
{
RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NANO, 5, 2, 4, 1);
RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, BYTE, 20, 2, 11, 9);
RuntimeMetric.merge(metric1, metric2);
}

@Test
public void testJson()
{
JsonCodec<RuntimeMetric> codec = JsonCodec.jsonCodec(RuntimeMetric.class);
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME);
RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NANO);
metric.addValue(101);
metric.addValue(202);

Expand Down
Loading