From 5951ccab0fc91ba44979ff7211aa8b6cf4e13ad4 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Fri, 4 Jan 2019 02:58:02 +0000 Subject: [PATCH] warn instead of throw in case of unsupported portable flink metrics --- .../flink/metrics/FlinkMetricContainer.java | 12 +- .../metrics/FlinkMetricContainerTest.java | 121 +++++++++++++++++- 2 files changed, 125 insertions(+), 8 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java index 3484b7fe32a39..c235a51782e61 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -119,16 +119,17 @@ public void updateMetrics(String stepName, List monito BeamFnApi.Metric metric = monitoringInfo.getMetric(); if (metric.hasCounterData()) { BeamFnApi.CounterData counterData = metric.getCounterData(); - org.apache.beam.sdk.metrics.Counter counter = metricsContainer.getCounter(metricName); if (counterData.getValueCase() == BeamFnApi.CounterData.ValueCase.INT64_VALUE) { + org.apache.beam.sdk.metrics.Counter counter = + metricsContainer.getCounter(metricName); counter.inc(counterData.getInt64Value()); } else { - throw new IllegalArgumentException("Unsupported CounterData type: " + counterData); + LOG.warn("Unsupported CounterData type: {}", counterData); } } else if (metric.hasDistributionData()) { BeamFnApi.DistributionData distributionData = metric.getDistributionData(); - Distribution distribution = metricsContainer.getDistribution(metricName); if (distributionData.hasIntDistributionData()) { + Distribution distribution = metricsContainer.getDistribution(metricName); BeamFnApi.IntDistributionData intDistributionData = distributionData.getIntDistributionData(); distribution.update( @@ -137,12 +138,11 @@ public void updateMetrics(String stepName, List monito intDistributionData.getMin(), intDistributionData.getMax()); } else { - throw new IllegalArgumentException( - "Unsupported DistributionData type: " + distributionData); + LOG.warn("Unsupported DistributionData type: {}", distributionData); } } else if (metric.hasExtremaData()) { BeamFnApi.ExtremaData extremaData = metric.getExtremaData(); - throw new IllegalArgumentException("Extrema metric unsupported: " + extremaData); + LOG.warn("Extrema metric unsupported: {}", extremaData); } } }); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java index d4786d8fbacfc..6b9435e89d2d6 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java @@ -17,18 +17,32 @@ */ package org.apache.beam.runners.flink.metrics; +import static org.apache.beam.model.fnexecution.v1.BeamFnApi.labelProps; import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN; +import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.CounterData; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.DoubleDistributionData; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.IntDistributionData; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo.MonitoringInfoLabels; +import org.apache.beam.runners.core.metrics.CounterCell; +import org.apache.beam.runners.core.metrics.DistributionCell; +import org.apache.beam.runners.core.metrics.DistributionData; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer.FlinkDistributionGauge; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.DistributionResult; @@ -43,6 +57,7 @@ import org.apache.flink.metrics.SimpleCounter; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -53,6 +68,13 @@ public class FlinkMetricContainerTest { @Mock private RuntimeContext runtimeContext; @Mock private MetricGroup metricGroup; + static final String PTRANSFORM_LABEL = + MonitoringInfoLabels.forNumber(MonitoringInfoLabels.TRANSFORM_VALUE) + .getValueDescriptor() + .getOptions() + .getExtension(labelProps) + .getName(); + @Before public void beforeTest() { MockitoAnnotations.initMocks(this); @@ -124,7 +146,7 @@ public void testMonitoringInfoUpdate() { SimpleMonitoringInfoBuilder userCountBuilder = new SimpleMonitoringInfoBuilder(); userCountBuilder.setUrnForUserMetric("ns1", "metric1"); userCountBuilder.setInt64Value(111); - BeamFnApi.MonitoringInfo userCountMonitoringInfo = userCountBuilder.build(); + MonitoringInfo userCountMonitoringInfo = userCountBuilder.build(); assertNotNull(userCountMonitoringInfo); SimpleMonitoringInfoBuilder elemCountBuilder = new SimpleMonitoringInfoBuilder(); @@ -132,7 +154,7 @@ public void testMonitoringInfoUpdate() { elemCountBuilder.setInt64Value(222); elemCountBuilder.setPTransformLabel("step"); elemCountBuilder.setPCollectionLabel("pcoll"); - BeamFnApi.MonitoringInfo elemCountMonitoringInfo = elemCountBuilder.build(); + MonitoringInfo elemCountMonitoringInfo = elemCountBuilder.build(); assertNotNull(elemCountMonitoringInfo); assertThat(userCounter.getCount(), is(0L)); @@ -143,6 +165,101 @@ public void testMonitoringInfoUpdate() { assertThat(elemCounter.getCount(), is(222L)); } + @Test + public void testDropUnexpectedMonitoringInfoTypes() { + FlinkMetricContainer flinkContainer = new FlinkMetricContainer(runtimeContext); + MetricsContainer step = flinkContainer.getMetricsContainer("step"); + + MonitoringInfo intCounter = + MonitoringInfo.newBuilder() + .setUrn(USER_COUNTER_URN_PREFIX + "ns1:int_counter") + .putLabels(PTRANSFORM_LABEL, "step") + .setMetric( + Metric.newBuilder().setCounterData(CounterData.newBuilder().setInt64Value(111))) + .build(); + + MonitoringInfo doubleCounter = + MonitoringInfo.newBuilder() + .setUrn(USER_COUNTER_URN_PREFIX + "ns2:double_counter") + .putLabels(PTRANSFORM_LABEL, "step") + .setMetric( + Metric.newBuilder().setCounterData(CounterData.newBuilder().setDoubleValue(222))) + .build(); + + MonitoringInfo intDistribution = + MonitoringInfo.newBuilder() + .setUrn(USER_COUNTER_URN_PREFIX + "ns3:int_distribution") + .putLabels(PTRANSFORM_LABEL, "step") + .setMetric( + Metric.newBuilder() + .setDistributionData( + BeamFnApi.DistributionData.newBuilder() + .setIntDistributionData( + IntDistributionData.newBuilder() + .setSum(30) + .setCount(10) + .setMin(1) + .setMax(5)))) + .build(); + + MonitoringInfo doubleDistribution = + MonitoringInfo.newBuilder() + .setUrn(USER_COUNTER_URN_PREFIX + "ns4:double_distribution") + .putLabels(PTRANSFORM_LABEL, "step") + .setMetric( + Metric.newBuilder() + .setDistributionData( + BeamFnApi.DistributionData.newBuilder() + .setDoubleDistributionData( + DoubleDistributionData.newBuilder() + .setSum(30) + .setCount(10) + .setMin(1) + .setMax(5)))) + .build(); + + // Mock out the counter that Flink returns; the distribution gets created by + // FlinkMetricContainer, not by Flink itself, so we verify it in a different way below + SimpleCounter counter = new SimpleCounter(); + when(metricGroup.counter("ns1.int_counter")).thenReturn(counter); + + flinkContainer.updateMetrics( + "step", ImmutableList.of(intCounter, doubleCounter, intDistribution, doubleDistribution)); + + // Flink's MetricGroup should only have asked for one counter (the integer-typed one) to be + // created (the double-typed one is dropped currently) + verify(metricGroup).counter(eq("ns1.int_counter")); + + // Verify that the counter injected into flink has the right value + assertThat(counter.getCount(), is(111L)); + + // Verify the counter in the java SDK MetricsContainer + long count = + ((CounterCell) step.getCounter(MetricName.named("ns1", "int_counter"))).getCumulative(); + assertThat(count, is(111L)); + + // The one Flink distribution that gets created is a FlinkDistributionGauge; here we verify its + // initial (and in this test, final) value + verify(metricGroup) + .gauge( + eq("ns3.int_distribution"), + argThat( + new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + DistributionResult actual = ((FlinkDistributionGauge) argument).getValue(); + DistributionResult expected = DistributionResult.create(30, 10, 1, 5); + return actual.equals(expected); + } + })); + + // Verify that the Java SDK MetricsContainer holds the same information + DistributionData distributionData = + ((DistributionCell) step.getDistribution(MetricName.named("ns3", "int_distribution"))) + .getCumulative(); + assertThat(distributionData, is(DistributionData.create(30, 10, 1, 5))); + } + @Test public void testDistribution() { FlinkMetricContainer.FlinkDistributionGauge flinkGauge =