From 67ef5bd491526e6ab4a525ebc1eba77823d2568d Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Sat, 6 Jul 2024 14:11:12 -0700 Subject: [PATCH] Fix precommit errors --- .../runners/core/metrics/StringSetData.java | 11 ++-- .../metrics/MetricsContainerStepMapTest.java | 60 +++++++++++++----- .../metrics/MonitoringInfoEncodingsTest.java | 1 - .../core/metrics/StringSetCellTest.java | 15 +++-- .../core/metrics/StringSetDataTest.java | 5 +- .../beam/runners/direct/DirectMetrics.java | 9 ++- .../runners/direct/DirectMetricsTest.java | 8 ++- .../runners/dataflow/DataflowMetrics.java | 8 +-- .../runners/dataflow/DataflowMetricsTest.java | 24 ++++--- .../MetricsToCounterUpdateConverter.java | 3 +- .../worker/StreamingStepMetricsContainer.java | 4 +- .../worker/BatchModeExecutionContextTest.java | 7 ++- .../StreamingStepMetricsContainerTest.java | 53 ++++++++-------- .../runners/jet/metrics/JetMetricResults.java | 7 ++- .../runners/jet/metrics/StringSetImpl.java | 19 +++++- .../runners/portability/PortableMetrics.java | 8 +-- .../beam/sdk/metrics/MetricQueryResults.java | 4 +- .../apache/beam/sdk/metrics/MetricResult.java | 1 + .../beam/sdk/metrics/MetricsContainer.java | 4 +- .../apache/beam/sdk/metrics/StringSet.java | 8 ++- .../beam/sdk/metrics/StringSetResult.java | 3 +- .../apache/beam/sdk/metrics/MetricsTest.java | 62 ++++++++++++------- .../control/ExecutionStateSampler.java | 2 +- .../control/ExecutionStateSamplerTest.java | 8 ++- 24 files changed, 214 insertions(+), 120 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java index cfc28803ddd5..0db5bdcd9c7b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java @@ -38,13 +38,14 @@ public static StringSetData create(Set stringSet) { return new AutoValue_StringSetData(stringSet); } - /** Return a {@link EmptyStringSetData#INSTANCE} representing an empty {@link StringSetData}. */ + /** Return a {@link EmptyStringSetData#INSTANCE} representing an empty {@link StringSetData}. */ public static StringSetData empty() { return EmptyStringSetData.INSTANCE; } - /** Combines this {@link StringSetData} with other, both original StringSetData are left - * intact. */ + /** + * Combines this {@link StringSetData} with other, both original StringSetData are left intact. + */ public StringSetData combine(StringSetData other) { // do not merge other on this as this StringSetData might hold an immutable set like in case // of EmptyStringSetData @@ -66,13 +67,13 @@ public static class EmptyStringSetData extends StringSetData { private EmptyStringSetData() {} - /** Return an immutable empty set. */ + /** Return an immutable empty set. */ @Override public Set stringSet() { return Collections.emptySet(); } - /** Return a {@link StringSetResult#empty()} which is immutable empty set. */ + /** Return a {@link StringSetResult#empty()} which is immutable empty set. */ @Override public StringSetResult extractResult() { return StringSetResult.empty(); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java index 6b5e4015b608..c270a0b01a30 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java @@ -76,7 +76,8 @@ public class MetricsContainerStepMapTest { Metrics.distribution(MetricsContainerStepMapTest.class, DISTRIBUTION_NAME); private static final Gauge gauge = Metrics.gauge(MetricsContainerStepMapTest.class, GAUGE_NAME); - private static final StringSet stringSet = Metrics.stringSet(MetricsContainerStepMapTest.class, STRING_SET_NAME); + private static final StringSet stringSet = + Metrics.stringSet(MetricsContainerStepMapTest.class, STRING_SET_NAME); private static final MetricsContainerImpl metricsContainer; @@ -121,8 +122,12 @@ public void testAttemptedAccumulatedMetricResults() { false); assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false); - assertStringSet(STRING_SET_NAME, step1res, STEP1, - StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), false); + assertStringSet( + STRING_SET_NAME, + step1res, + STEP1, + StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), + false); MetricQueryResults step2res = metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); @@ -141,8 +146,12 @@ public void testAttemptedAccumulatedMetricResults() { false); assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); - assertStringSet(STRING_SET_NAME, step2res, STEP2, - StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), false); + assertStringSet( + STRING_SET_NAME, + step2res, + STEP2, + StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), + false); MetricQueryResults allres = metricResults.allMetrics(); @@ -302,8 +311,12 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() { DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), true); assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), true); - assertStringSet(STRING_SET_NAME, step1res, STEP1, - StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), false); + assertStringSet( + STRING_SET_NAME, + step1res, + STEP1, + StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), + false); MetricQueryResults step2res = metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); @@ -321,8 +334,12 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() { DistributionResult.create(VALUE * 9, 6, VALUE, VALUE * 2), false); assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); - assertStringSet(STRING_SET_NAME, step2res, STEP2, - StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), false); + assertStringSet( + STRING_SET_NAME, + step2res, + STEP2, + StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), + false); assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 2, true); assertDistribution( @@ -332,8 +349,12 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() { DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), true); assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), true); - assertStringSet(STRING_SET_NAME, step2res, STEP2, - StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), true); + assertStringSet( + STRING_SET_NAME, + step2res, + STEP2, + StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), + true); MetricQueryResults allres = metricResults.queryMetrics(MetricsFilter.builder().build()); @@ -389,8 +410,12 @@ public void testReset() { DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), false); assertGauge(GAUGE_NAME, allres, STEP1, GaugeResult.create(VALUE, Instant.now()), false); - assertStringSet(STRING_SET_NAME, allres, STEP1, - StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), false); + assertStringSet( + STRING_SET_NAME, + allres, + STEP1, + StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), + false); assertCounter(COUNTER_NAME, allres, STEP2, VALUE * 2, false); assertDistribution( @@ -400,8 +425,12 @@ public void testReset() { DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), false); assertGauge(GAUGE_NAME, allres, STEP2, GaugeResult.create(VALUE, Instant.now()), false); - assertStringSet(STRING_SET_NAME, allres, STEP2, - StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), false); + assertStringSet( + STRING_SET_NAME, + allres, + STEP2, + StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)), + false); attemptedMetrics.reset(); metricResults = asAttemptedOnlyMetricResults(attemptedMetrics); @@ -458,6 +487,7 @@ private void assertGauge( metricQueryResults.getGauges(), hasItem(metricsResult(NAMESPACE, name, step, expected, isCommitted))); } + private void assertStringSet( String name, MetricQueryResults metricQueryResults, diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java index 0f13645299f6..30ebaa87b574 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java @@ -71,7 +71,6 @@ public void testInt64GaugeEncoding() { assertEquals(data, decodeInt64Gauge(payload)); } - @Test public void testStringSetEncoding() { diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java index 2ca96b769fb5..f78ed01603fb 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java @@ -35,7 +35,9 @@ public void testDeltaAndCumulative() { cell.add("pubsub"); cell.add("bq", "spanner"); assertEquals(cell.getCumulative().stringSet(), ImmutableSet.of("spanner", "pubsub", "bq")); - assertEquals("getCumulative is idempotent", cell.getCumulative().stringSet(), + assertEquals( + "getCumulative is idempotent", + cell.getCumulative().stringSet(), ImmutableSet.of("spanner", "pubsub", "bq")); assertThat(cell.getDirty().beforeCommit(), equalTo(true)); @@ -43,11 +45,11 @@ public void testDeltaAndCumulative() { assertThat(cell.getDirty().beforeCommit(), equalTo(false)); cell.add("gcs"); - assertEquals(cell.getCumulative().stringSet(), - ImmutableSet.of("spanner", "pubsub", "bq", "gcs")); + assertEquals( + cell.getCumulative().stringSet(), ImmutableSet.of("spanner", "pubsub", "bq", "gcs")); - assertThat("Adding a new value made the cell dirty", cell.getDirty().beforeCommit(), - equalTo(true)); + assertThat( + "Adding a new value made the cell dirty", cell.getDirty().beforeCommit(), equalTo(true)); } @Test @@ -84,7 +86,8 @@ public void testReset() { StringSetCell stringSetCell = new StringSetCell(MetricName.named("namespace", "name")); stringSetCell.add("hello"); Assert.assertNotEquals(stringSetCell.getDirty(), new DirtyState()); - assertThat(stringSetCell.getCumulative().stringSet(), + assertThat( + stringSetCell.getCumulative().stringSet(), equalTo(StringSetData.create(ImmutableSet.of("hello")).stringSet())); stringSetCell.reset(); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java index f947651c8f38..cf8982dc57cf 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java @@ -28,8 +28,8 @@ /** Tests for {@link StringSetData}. */ public class StringSetDataTest { - @Rule - public ExpectedException exception = ExpectedException.none(); + @Rule public ExpectedException exception = ExpectedException.none(); + @Test public void testCreate() { // test empty stringset creation @@ -67,6 +67,7 @@ public void testCombineWithEmpty() { assertTrue(empty.stringSet().isEmpty()); assertEquals(multipleElement.stringSet(), ImmutableSet.of("cd", "ef")); } + @Test public void testEmpty() { StringSetData empty = StringSetData.empty(); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index 2b4b32f0ded7..9e1aec321c94 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -234,6 +234,7 @@ public StringSetData combine(Iterable updates) { } return result; } + @Override public StringSetResult extract(StringSetData data) { return data.extractResult(); @@ -275,12 +276,16 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { } ImmutableList.Builder> stringSetResult = ImmutableList.builder(); - for (Entry> stringSet : stringSet.entries()) { + for (Entry> stringSet : + stringSet.entries()) { maybeExtractResult(filter, stringSetResult, stringSet); } return MetricQueryResults.create( - counterResults.build(), distributionResults.build(), gaugeResults.build(), stringSetResult.build()); + counterResults.build(), + distributionResults.build(), + gaugeResults.build(), + stringSetResult.build()); } private void maybeExtractResult( diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java index 040cb4fa0a01..00df20c4ac39 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -90,7 +90,9 @@ public void testApplyCommittedNoFilter() { ImmutableList.of( MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(15L))), ImmutableList.of( - MetricUpdate.create(MetricKey.create("step1", NAME4), StringSetData.create(ImmutableSet.of("ab")))))); + MetricUpdate.create( + MetricKey.create("step1", NAME4), + StringSetData.create(ImmutableSet.of("ab")))))); metrics.commitLogical( bundle1, MetricUpdates.create( @@ -103,7 +105,9 @@ public void testApplyCommittedNoFilter() { ImmutableList.of( MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L))), ImmutableList.of( - MetricUpdate.create(MetricKey.create("step1", NAME4), StringSetData.create(ImmutableSet.of("cd")))))); + MetricUpdate.create( + MetricKey.create("step1", NAME4), + StringSetData.create(ImmutableSet.of("cd")))))); MetricQueryResults results = metrics.allMetrics(); assertThat( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 68722ccec8fb..19a4bd76c4b0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -163,10 +163,10 @@ public void addMetricResult( Long value = getCounterValue(committed); counterResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else if (committed.getSet() != null && attempted.getSet() != null) { - // stringset metric - StringSetResult value = getStringSetValue(committed); - stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); - } else { + // stringset metric + StringSetResult value = getStringSetValue(committed); + stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); + } else { // This is exceptionally unexpected. We expect matching user metrics to only have the // value types provided by the Metrics API. LOG.warn( diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index c68f24113931..b6b92103c90f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -257,11 +257,11 @@ public void testSingleStringSetUpdates() throws IOException { // The parser relies on the fact that one tentative and one committed metric update exist in // the job metrics results. MetricUpdate mu1 = - makeStringSetMetricUpdate("counterName", "counterNamespace", "s2", - Arrays.asList("ab", "cd"), false); + makeStringSetMetricUpdate( + "counterName", "counterNamespace", "s2", Arrays.asList("ab", "cd"), false); MetricUpdate mu1Tentative = - makeStringSetMetricUpdate("counterName", "counterNamespace", "s2", - Arrays.asList("ef", "gh"), true); + makeStringSetMetricUpdate( + "counterName", "counterNamespace", "s2", Arrays.asList("ef", "gh"), true); jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative)); DataflowClient dataflowClient = mock(DataflowClient.class); when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); @@ -271,15 +271,19 @@ public void testSingleStringSetUpdates() throws IOException { assertThat( result.getStringSets(), containsInAnyOrder( - attemptedMetricsResult("counterNamespace", "counterName", "myStepName", - StringSetResult.create( - ImmutableSet.of("ab", "cd"))))); + attemptedMetricsResult( + "counterNamespace", + "counterName", + "myStepName", + StringSetResult.create(ImmutableSet.of("ab", "cd"))))); assertThat( result.getStringSets(), containsInAnyOrder( - committedMetricsResult("counterNamespace", "counterName", "myStepName", - StringSetResult.create( - ImmutableSet.of("ef", "gh"))))); + committedMetricsResult( + "counterNamespace", + "counterName", + "myStepName", + StringSetResult.create(ImmutableSet.of("ef", "gh"))))); } @Test diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index 5592dbb0f96f..dbedc51528a5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -98,8 +98,7 @@ public static CounterUpdate fromGauge( .setIntegerGauge(integerGaugeProto); } - public static CounterUpdate fromStringSet( - MetricKey key, StringSetData stringSetData) { + public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSetData) { CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); StringList stringList = new StringList(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 22965f81d3a4..7cc0dc68f7e7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -185,7 +185,9 @@ public Histogram getPerWorkerHistogram( } public Iterable extractUpdates() { - return counterUpdates().append(distributionUpdates()).append(gaugeUpdates().append(stringSetUpdates())); + return counterUpdates() + .append(distributionUpdates()) + .append(gaugeUpdates().append(stringSetUpdates())); } private FluentIterable counterUpdates() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index 5e501fc8d153..18bd814b4df7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -168,9 +168,10 @@ public void extractMetricUpdatesStringSet() { DataflowOperationContext operationContext = executionContext.createOperationContext(NameContextsForTests.nameContextForTest()); - StringSet stringSet = operationContext - .metricsContainer() - .getStringSet(MetricName.named("namespace", "some-stringset")); + StringSet stringSet = + operationContext + .metricsContainer() + .getStringSet(MetricName.named("namespace", "some-stringset")); stringSet.add("ab"); stringSet.add("cd"); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 768123642f53..2d5a8d8266ae 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -74,7 +74,7 @@ /** Tests for {@link StreamingStepMetricsContainer}. */ @RunWith(JUnit4.class) @SuppressWarnings({ - "rawtypes" // TODO(https://github.com/apache/beam/issues/20447) + "rawtypes" // TODO(https://github.com/apache/beam/issues/20447) }) public class StreamingStepMetricsContainerTest { @Rule public transient Timeout globalTimeout = Timeout.seconds(600); @@ -269,6 +269,7 @@ public void testGaugeUpdateExtraction() { // Release freeze on clock. DateTimeUtils.setCurrentMillisSystem(); } + @Test public void testStringSetUpdateExtraction() { StringSet stringSet = c1.getStringSet(name1); @@ -277,18 +278,19 @@ public void testStringSetUpdateExtraction() { stringSet.add("gh"); stringSet.add("gh"); - CounterUpdate name1Update = new CounterUpdate() - .setStructuredNameAndMetadata( - new CounterStructuredNameAndMetadata() - .setName( - new CounterStructuredName() - .setOrigin(Origin.USER.toString()) - .setOriginNamespace("ns") - .setName("name1") - .setOriginalStepName("s1")) - .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) - .setCumulative(false) - .setStringList(new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh"))); + CounterUpdate name1Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name1") + .setOriginalStepName("s1")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .setStringList(new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh"))); Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); @@ -298,18 +300,19 @@ public void testStringSetUpdateExtraction() { stringSet.add("kl", "mn"); stringSet.add("mn"); - CounterUpdate name2Update = new CounterUpdate() - .setStructuredNameAndMetadata( - new CounterStructuredNameAndMetadata() - .setName( - new CounterStructuredName() - .setOrigin(Origin.USER.toString()) - .setOriginNamespace("ns") - .setName("name2") - .setOriginalStepName("s2")) - .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) - .setCumulative(false) - .setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn"))); + CounterUpdate name2Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name2") + .setOriginalStepName("s2")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn"))); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update, name2Update)); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java index 10749f854dc4..44681a626cc0 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java @@ -75,7 +75,9 @@ public synchronized MetricQueryResults queryMetrics(@Nullable MetricsFilter filt updateLocalMetrics(metricsAccumulator); } return new QueryResults( - counters.filter(filter), distributions.filter(filter), gauges.filter(filter), + counters.filter(filter), + distributions.filter(filter), + gauges.filter(filter), stringSet.filter(filter)); } @@ -253,7 +255,8 @@ Iterable> filter(MetricsFilter filter) { .toList(); } - private MetricResult toUpdateResult(Map.Entry entry) { + private MetricResult toUpdateResult( + Map.Entry entry) { MetricKey key = entry.getKey(); StringSetResult stringSetResult = entry.getValue().extractResult(); return MetricResult.create(key, stringSetResult, stringSetResult); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java index 253c88873247..f2072067368f 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.jet.metrics; import java.util.Arrays; @@ -30,4 +47,4 @@ public void add(String value) { public void add(String... values) { stringSetData.combine(StringSetData.create(new HashSet<>(Arrays.asList(values)))); } -} \ No newline at end of file +} diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java index 37df86f1d671..1d45a83b1e79 100644 --- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java +++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java @@ -83,8 +83,8 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) { this.distributions, (distribution) -> MetricFiltering.matches(filter, distribution.getKey())), Iterables.filter(this.gauges, (gauge) -> MetricFiltering.matches(filter, gauge.getKey())), - Iterables.filter(this.stringSets, (stringSet) -> MetricFiltering.matches(filter, - stringSet.getKey()))); + Iterables.filter( + this.stringSets, (stringSet) -> MetricFiltering.matches(filter, stringSet.getKey()))); } private static PortableMetrics convertMonitoringInfosToMetricResults( @@ -100,8 +100,8 @@ private static PortableMetrics convertMonitoringInfosToMetricResults( extractGaugeMetricsFromJobMetrics(monitoringInfoList); Iterable> stringSetFromMetrics = extractStringSetMetricsFromJobMetrics(monitoringInfoList); - return new PortableMetrics(countersFromJobMetrics, distributionsFromMetrics, - gaugesFromMetrics, stringSetFromMetrics); + return new PortableMetrics( + countersFromJobMetrics, distributionsFromMetrics, gaugesFromMetrics, stringSetFromMetrics); } private static Iterable> diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java index 50440081cb65..9f60ce3d6c07 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java @@ -21,9 +21,7 @@ import java.util.List; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -/** - * The results of a query for metrics. Allows accessing all the metrics that matched the filter. - */ +/** The results of a query for metrics. Allows accessing all the metrics that matched the filter. */ @AutoValue public abstract class MetricQueryResults { /** Return the metric results for the counters that matched the filter. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java index 48a5aea59695..b9cbc8d755ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java @@ -82,6 +82,7 @@ public MetricResult addCommitted(T update, BiFunction combine) { return create( getKey(), committed == null ? update : combine.apply(committed, update), getAttempted()); } + public static MetricResult attempted(MetricKey key, T attempted) { return new AutoValue_MetricResult<>(key, null, attempted); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index 7a5d981e67e1..0c4766bb2c0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -54,8 +54,8 @@ default Counter getPerWorkerCounter(MetricName metricName) { Gauge getGauge(MetricName metricName); /** - * Return the {@link StringSet} that should be used for implementing the given {@code metricName} in - * this container. + * Return the {@link StringSet} that should be used for implementing the given {@code metricName} + * in this container. */ StringSet getStringSet(MetricName metricName); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSet.java index c69e3da2a331..42e8f2388e38 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSet.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSet.java @@ -17,10 +17,12 @@ */ package org.apache.beam.sdk.metrics; -/** A metric that reports set of unique string values. - * This metric is backed by {@link java.util.HashSet} and hence it does not maintain any ordering. */ +/** + * A metric that reports set of unique string values. This metric is backed by {@link + * java.util.HashSet} and hence it does not maintain any ordering. + */ public interface StringSet extends Metric { - + /** Add a value to this set. */ void add(String value); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java index 42eeba1cca44..80845090fd7d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java @@ -38,9 +38,10 @@ public static StringSetResult empty() { public static class EmptyStringSetResult extends StringSetResult { private static final EmptyStringSetResult INSTANCE = new EmptyStringSetResult(); + private EmptyStringSetResult() {} - /** Returns an empty immutable set */ + /** Returns an empty immutable set. */ @Override public Set getStringSet() { return Collections.emptySet(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 7c8a8f1b237a..79709c89963b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableSet; import java.io.Serializable; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.GenerateSequence; @@ -46,6 +45,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; @@ -241,12 +241,12 @@ public void testCounterToCell() { @RunWith(JUnit4.class) public static class CommittedMetricTests extends SharedTestBase { @Category({ - ValidatesRunner.class, - UsesCommittedMetrics.class, - UsesCounterMetrics.class, - UsesDistributionMetrics.class, - UsesGaugeMetrics.class, - UsesStringSetMetrics.class + ValidatesRunner.class, + UsesCommittedMetrics.class, + UsesCounterMetrics.class, + UsesDistributionMetrics.class, + UsesGaugeMetrics.class, + UsesStringSetMetrics.class }) @Test public void testAllCommittedMetrics() { @@ -287,6 +287,7 @@ public void testCommittedStringSetMetrics() { MetricQueryResults metrics = queryTestMetrics(result); assertStringSetMetrics(metrics, true); } + @Test @Category({NeedsRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) public void testBoundedSourceMetrics() { @@ -368,12 +369,12 @@ public void testUnboundedSourceMetrics() { @RunWith(JUnit4.class) public static class AttemptedMetricTests extends SharedTestBase { @Category({ - ValidatesRunner.class, - UsesAttemptedMetrics.class, - UsesCounterMetrics.class, - UsesDistributionMetrics.class, - UsesGaugeMetrics.class, - UsesStringSetMetrics.class + ValidatesRunner.class, + UsesAttemptedMetrics.class, + UsesCounterMetrics.class, + UsesDistributionMetrics.class, + UsesGaugeMetrics.class, + UsesStringSetMetrics.class }) @Test public void testAllAttemptedMetrics() { @@ -445,16 +446,33 @@ private static void assertGaugeMetrics(MetricQueryResults metrics, boolean isCom } private static void assertStringSetMetrics(MetricQueryResults metrics, boolean isCommitted) { - assertThat(metrics.getStringSets(), + assertThat( + metrics.getStringSets(), containsInAnyOrder( - metricsResult(NAMESPACE, "sources", "MyStep1", - StringSetResult.create(ImmutableSet.of("gcs")), isCommitted), - metricsResult(NAMESPACE, "sinks", "MyStep2", - StringSetResult.create(ImmutableSet.of("kafka", "bq")), isCommitted), - metricsResult(NAMESPACE, "sideinputs", "MyStep1", - StringSetResult.create(ImmutableSet.of("bigtable", "spanner")), isCommitted), - metricsResult(NAMESPACE, "sideinputs", "MyStep2", - StringSetResult.create(ImmutableSet.of("sql", "bigtable")), isCommitted))); + metricsResult( + NAMESPACE, + "sources", + "MyStep1", + StringSetResult.create(ImmutableSet.of("gcs")), + isCommitted), + metricsResult( + NAMESPACE, + "sinks", + "MyStep2", + StringSetResult.create(ImmutableSet.of("kafka", "bq")), + isCommitted), + metricsResult( + NAMESPACE, + "sideinputs", + "MyStep1", + StringSetResult.create(ImmutableSet.of("bigtable", "spanner")), + isCommitted), + metricsResult( + NAMESPACE, + "sideinputs", + "MyStep2", + StringSetResult.create(ImmutableSet.of("sql", "bigtable")), + isCommitted))); } private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) { diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java index 3a9c520973bd..bcd243ba746d 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java @@ -219,7 +219,7 @@ public Gauge getGauge(MetricName metricName) { @Override public StringSet getStringSet(MetricName metricName) { - if(tracker.currentState != null) { + if (tracker.currentState != null) { return tracker.currentState.metricsContainer.getStringSet(metricName); } return tracker.metricsContainerRegistry.getUnboundContainer().getStringSet(metricName); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java index 251d34057e00..1f4341860295 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; -import com.google.common.collect.ImmutableSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -48,6 +47,7 @@ import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.joda.time.DateTimeUtils.MillisProvider; import org.joda.time.Duration; import org.junit.After; @@ -423,7 +423,8 @@ public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Excep .getMetricsContainerRegistry() .getContainer("ptransformId") .getStringSet(TEST_USER_STRING_SET.getName()) - .getCumulative().stringSet()); + .getCumulative() + .stringSet()); assertEquals( 1L, (long) @@ -468,7 +469,8 @@ public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Excep .getMetricsContainerRegistry() .getUnboundContainer() .getStringSet(TEST_USER_STRING_SET.getName()) - .getCumulative().stringSet()); + .getCumulative() + .stringSet()); assertEquals( 2L, (long)