Skip to content

Commit

Permalink
warn instead of throw in case of unsupported portable flink metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Jan 4, 2019
1 parent 2da1724 commit e9e0311
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@ public void updateMetrics(String stepName, List<BeamFnApi.MonitoringInfo> monito
BeamFnApi.Metric metric = monitoringInfo.getMetric();
if (metric.hasCounterData()) {
BeamFnApi.CounterData counterData = metric.getCounterData();
org.apache.beam.sdk.metrics.Counter counter = metricsContainer.getCounter(metricName);
if (counterData.getValueCase() == BeamFnApi.CounterData.ValueCase.INT64_VALUE) {
org.apache.beam.sdk.metrics.Counter counter =
metricsContainer.getCounter(metricName);
counter.inc(counterData.getInt64Value());
} else {
throw new IllegalArgumentException("Unsupported CounterData type: " + counterData);
LOG.warn("Unsupported CounterData type: {}", counterData);
}
} else if (metric.hasDistributionData()) {
BeamFnApi.DistributionData distributionData = metric.getDistributionData();
Distribution distribution = metricsContainer.getDistribution(metricName);
if (distributionData.hasIntDistributionData()) {
Distribution distribution = metricsContainer.getDistribution(metricName);
BeamFnApi.IntDistributionData intDistributionData =
distributionData.getIntDistributionData();
distribution.update(
Expand All @@ -137,12 +138,11 @@ public void updateMetrics(String stepName, List<BeamFnApi.MonitoringInfo> monito
intDistributionData.getMin(),
intDistributionData.getMax());
} else {
throw new IllegalArgumentException(
"Unsupported DistributionData type: " + distributionData);
LOG.warn("Unsupported DistributionData type: {}", distributionData);
}
} else if (metric.hasExtremaData()) {
BeamFnApi.ExtremaData extremaData = metric.getExtremaData();
throw new IllegalArgumentException("Extrema metric unsupported: " + extremaData);
LOG.warn("Extrema metric unsupported: {}", extremaData);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,32 @@
*/
package org.apache.beam.runners.flink.metrics;

import static org.apache.beam.model.fnexecution.v1.BeamFnApi.labelProps;
import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN;
import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.CounterData;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.DoubleDistributionData;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.IntDistributionData;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo.MonitoringInfoLabels;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.DistributionCell;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer.FlinkDistributionGauge;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.DistributionResult;
Expand All @@ -43,6 +57,7 @@
import org.apache.flink.metrics.SimpleCounter;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
Expand All @@ -53,6 +68,13 @@ public class FlinkMetricContainerTest {
@Mock private RuntimeContext runtimeContext;
@Mock private MetricGroup metricGroup;

static String PTRANSFORM_LABEL =
MonitoringInfoLabels.forNumber(MonitoringInfoLabels.TRANSFORM_VALUE)
.getValueDescriptor()
.getOptions()
.getExtension(labelProps)
.getName();

@Before
public void beforeTest() {
MockitoAnnotations.initMocks(this);
Expand Down Expand Up @@ -124,15 +146,15 @@ public void testMonitoringInfoUpdate() {
SimpleMonitoringInfoBuilder userCountBuilder = new SimpleMonitoringInfoBuilder();
userCountBuilder.setUrnForUserMetric("ns1", "metric1");
userCountBuilder.setInt64Value(111);
BeamFnApi.MonitoringInfo userCountMonitoringInfo = userCountBuilder.build();
MonitoringInfo userCountMonitoringInfo = userCountBuilder.build();
assertNotNull(userCountMonitoringInfo);

SimpleMonitoringInfoBuilder elemCountBuilder = new SimpleMonitoringInfoBuilder();
elemCountBuilder.setUrn(ELEMENT_COUNT_URN);
elemCountBuilder.setInt64Value(222);
elemCountBuilder.setPTransformLabel("step");
elemCountBuilder.setPCollectionLabel("pcoll");
BeamFnApi.MonitoringInfo elemCountMonitoringInfo = elemCountBuilder.build();
MonitoringInfo elemCountMonitoringInfo = elemCountBuilder.build();
assertNotNull(elemCountMonitoringInfo);

assertThat(userCounter.getCount(), is(0L));
Expand All @@ -143,6 +165,101 @@ public void testMonitoringInfoUpdate() {
assertThat(elemCounter.getCount(), is(222L));
}

@Test
public void testDropUnexpectedMonitoringInfoTypes() {
FlinkMetricContainer flinkContainer = new FlinkMetricContainer(runtimeContext);
MetricsContainer step = flinkContainer.getMetricsContainer("step");

MonitoringInfo intCounter =
MonitoringInfo.newBuilder()
.setUrn(USER_COUNTER_URN_PREFIX + "ns1:int_counter")
.putLabels(PTRANSFORM_LABEL, "step")
.setMetric(
Metric.newBuilder().setCounterData(CounterData.newBuilder().setInt64Value(111)))
.build();

MonitoringInfo doubleCounter =
MonitoringInfo.newBuilder()
.setUrn(USER_COUNTER_URN_PREFIX + "ns2:double_counter")
.putLabels(PTRANSFORM_LABEL, "step")
.setMetric(
Metric.newBuilder().setCounterData(CounterData.newBuilder().setDoubleValue(222)))
.build();

MonitoringInfo intDistribution =
MonitoringInfo.newBuilder()
.setUrn(USER_COUNTER_URN_PREFIX + "ns3:int_distribution")
.putLabels(PTRANSFORM_LABEL, "step")
.setMetric(
Metric.newBuilder()
.setDistributionData(
BeamFnApi.DistributionData.newBuilder()
.setIntDistributionData(
IntDistributionData.newBuilder()
.setSum(30)
.setCount(10)
.setMin(1)
.setMax(5))))
.build();

MonitoringInfo doubleDistribution =
MonitoringInfo.newBuilder()
.setUrn(USER_COUNTER_URN_PREFIX + "ns4:double_distribution")
.putLabels(PTRANSFORM_LABEL, "step")
.setMetric(
Metric.newBuilder()
.setDistributionData(
BeamFnApi.DistributionData.newBuilder()
.setDoubleDistributionData(
DoubleDistributionData.newBuilder()
.setSum(30)
.setCount(10)
.setMin(1)
.setMax(5))))
.build();

// Mock out the counter that Flink returns; the distribution gets created by FlinkMetricContainer, not by Flink
// itself, so we verify it in a different way below
SimpleCounter counter = new SimpleCounter();
when(metricGroup.counter("ns1.int_counter")).thenReturn(counter);

flinkContainer.updateMetrics(
"step", ImmutableList.of(intCounter, doubleCounter, intDistribution, doubleDistribution));

// Flink's MetricGroup should only have asked for one counter (the integer-typed one) to be created (the
// double-typed one is dropped currently)
verify(metricGroup).counter(eq("ns1.int_counter"));

// Verify that the counter injected into flink has the right value
assertThat(counter.getCount(), is(111L));

// Verify the counter in the java SDK MetricsContainer
long count =
((CounterCell) step.getCounter(MetricName.named("ns1", "int_counter"))).getCumulative();
assertThat(count, is(111L));

// The one Flink distribution that gets created is a FlinkDistributionGauge; here we verify its initial (and in this
// test, final) value
verify(metricGroup)
.gauge(
eq("ns3.int_distribution"),
argThat(
new ArgumentMatcher<FlinkDistributionGauge>() {
@Override
public boolean matches(Object argument) {
DistributionResult actual = ((FlinkDistributionGauge) argument).getValue();
DistributionResult expected = DistributionResult.create(30, 10, 1, 5);
return actual.equals(expected);
}
}));

// Verify that the Java SDK MetricsContainer holds the same information
DistributionData distributionData =
((DistributionCell) step.getDistribution(MetricName.named("ns3", "int_distribution")))
.getCumulative();
assertThat(distributionData, is(DistributionData.create(30, 10, 1, 5)));
}

@Test
public void testDistribution() {
FlinkMetricContainer.FlinkDistributionGauge flinkGauge =
Expand Down

0 comments on commit e9e0311

Please sign in to comment.