From ad17a6912d96357e305cd10e57cc5e206b3f481c Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Sun, 7 Jul 2024 20:07:06 -0700 Subject: [PATCH] Fixes for review comments --- .../core/metrics/MetricsContainerImpl.java | 4 +- .../core/metrics/MonitoringInfoEncodings.java | 21 ++---- .../runners/core/metrics/StringSetCell.java | 11 ++-- .../runners/core/metrics/StringSetData.java | 37 +++++++---- .../core/metrics/StringSetDataTest.java | 13 +++- .../beam/runners/direct/DirectMetrics.java | 4 +- .../runners/dataflow/DataflowMetrics.java | 5 +- .../runners/dataflow/DataflowMetricsTest.java | 11 ++-- .../runners/jet/metrics/StringSetImpl.java | 5 +- .../beam/sdk/metrics/StringSetResult.java | 24 +++++-- .../beam/sdk/metrics/StringSetResultTest.java | 64 +++++++++++++++++++ 11 files changed, 147 insertions(+), 52 deletions(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/StringSetResultTest.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 8f6441f189e0d..a2f6511d5129a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -206,7 +206,7 @@ public StringSetCell getStringSet(MetricName metricName) { } /** - * Return a {@code StringSetCell} named {@code metricName}.If it doesn't exist, return {@code + * Return a {@code StringSetCell} named {@code metricName}. If it doesn't exist, return {@code * null}. */ public @Nullable StringSetCell tryGetStringSet(MetricName metricName) { @@ -418,7 +418,7 @@ private String getShortId( } /** - * Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as + * Mark all the updates that were retrieved with the latest call to {@link #getUpdates()} as * committed. */ public void commitUpdates() { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java index 3c7f5f7882478..433e7f4fb20b9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java @@ -19,22 +19,23 @@ import java.io.IOException; import java.io.InputStream; -import java.util.HashSet; -import java.util.Objects; import java.util.Set; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; /** A set of functions used to encode and decode common monitoring info types. */ public class MonitoringInfoEncodings { private static final Coder VARINT_CODER = VarLongCoder.of(); private static final Coder DOUBLE_CODER = DoubleCoder.of(); - private static final Coder STRING_CODER = StringUtf8Coder.of(); + private static final IterableCoder STRING_SET_CODER = + IterableCoder.of(StringUtf8Coder.of()); /** Encodes to {@link MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}. */ public static ByteString encodeInt64Distribution(DistributionData data) { @@ -106,12 +107,7 @@ public static GaugeData decodeInt64Gauge(ByteString payload) { /** Encodes to {@link MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}. */ public static ByteString encodeStringSet(StringSetData data) { try (ByteStringOutputStream output = new ByteStringOutputStream()) { - // encode the length of set - STRING_CODER.encode(String.valueOf(data.stringSet().size()), output); - // encode all elements - for (String s : data.stringSet()) { - STRING_CODER.encode(s, output); - } + STRING_SET_CODER.encode(data.stringSet(), output); return output.toByteString(); } catch (IOException e) { throw new RuntimeException(e); @@ -120,13 +116,8 @@ public static ByteString encodeStringSet(StringSetData data) { /** Decodes from {@link MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}. */ public static StringSetData decodeStringSet(ByteString payload) { - Set elements = new HashSet<>(); try (InputStream input = payload.newInput()) { - int size = Integer.parseInt(Objects.requireNonNull(STRING_CODER.decode(input))); - while (size > 0) { - elements.add(Objects.requireNonNull(STRING_CODER.decode(input))); - size--; - } + Set elements = Sets.newHashSet(STRING_SET_CODER.decode(input)); return StringSetData.create(elements); } catch (IOException e) { throw new RuntimeException(e); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java index 8d6eee79b87e7..db457b6124095 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java @@ -18,13 +18,12 @@ package org.apache.beam.runners.core.metrics; import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.StringSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -99,11 +98,15 @@ public int hashCode() { @Override public void add(String value) { - update(StringSetData.create(new HashSet<>(Collections.singletonList(value)))); + // if the given value is already present in the StringSet then skip this add for efficiency + if (this.setValue.get().stringSet().contains(value)) { + return; + } + update(StringSetData.create(ImmutableSet.of(value))); } @Override public void add(String... values) { - update(StringSetData.create(new HashSet<>(Arrays.asList(values)))); + update(StringSetData.create(ImmutableSet.copyOf(Arrays.asList(values)))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java index 0db5bdcd9c7b3..d321c5980f0c6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java @@ -19,13 +19,17 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; -import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.beam.sdk.metrics.StringSetResult; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; /** - * Data describing the StringSet. This should retain enough detail that it can be combined with + * Data describing the StringSet. The {@link StringSetData} hold an immutable copy + * of the set from which it was initially created representing that a result cannot be modified + * once created. This should retain enough detail that it can be combined with * other {@link StringSetData}. */ @AutoValue @@ -33,9 +37,9 @@ public abstract class StringSetData implements Serializable { public abstract Set stringSet(); - /** Returns a {@link StringSetData} which consists of the given set. */ - public static StringSetData create(Set stringSet) { - return new AutoValue_StringSetData(stringSet); + /** Returns a {@link StringSetData} which is made from an immutable copy of the given set. */ + public static StringSetData create(Set set) { + return new AutoValue_StringSetData(ImmutableSet.copyOf(set)); } /** Return a {@link EmptyStringSetData#INSTANCE} representing an empty {@link StringSetData}. */ @@ -49,10 +53,21 @@ public static StringSetData empty() { public StringSetData combine(StringSetData other) { // do not merge other on this as this StringSetData might hold an immutable set like in case // of EmptyStringSetData - Set merged = new HashSet<>(); - merged.addAll(this.stringSet()); - merged.addAll(other.stringSet()); - return StringSetData.create(merged); + Set combined = new HashSet<>(); + combined.addAll(this.stringSet()); + combined.addAll(other.stringSet()); + return StringSetData.create(combined); + } + + /** + * Combines this {@link StringSetData} with others, all original StringSetData are left intact. + */ + public StringSetData combine(Iterable others) { + Set combined = StreamSupport.stream(others.spliterator(), true) + .flatMap(other -> other.stringSet().stream()) + .collect(Collectors.toSet()); + combined.addAll(this.stringSet()); + return StringSetData.create(combined); } /** Returns a {@link StringSetResult} representing this {@link StringSetData}. */ @@ -67,10 +82,10 @@ public static class EmptyStringSetData extends StringSetData { private EmptyStringSetData() {} - /** Return an immutable empty set. */ + /** Returns an immutable empty set. */ @Override public Set stringSet() { - return Collections.emptySet(); + return ImmutableSet.of(); } /** Return a {@link StringSetResult#empty()} which is immutable empty set. */ diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java index cf8982dc57cff..665ce3743c511 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core.metrics; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.util.Collections; @@ -75,10 +76,9 @@ public void testEmpty() { } @Test - public void testNoAddToStringSetDataEmpty() { - exception.expect(UnsupportedOperationException.class); + public void testStringSetDataEmptyIsImmutable() { StringSetData empty = StringSetData.empty(); - empty.stringSet().add("aa"); + assertThrows(UnsupportedOperationException.class, () -> empty.stringSet().add("aa")); } @Test @@ -92,4 +92,11 @@ public void testExtract() { StringSetData stringSetData = StringSetData.create(contents); assertEquals(stringSetData.stringSet(), contents); } + + @Test + public void testExtractReturnsImmutable() { + StringSetData stringSetData = StringSetData.create(ImmutableSet.of("ab", "cd")); + // check that immutable copy is returned + assertThrows(UnsupportedOperationException.class, () -> stringSetData.stringSet().add("aa")); + } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index 9e1aec321c940..b02c4f030b273 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -229,9 +229,7 @@ public StringSetData zero() { @Override public StringSetData combine(Iterable updates) { StringSetData result = StringSetData.empty(); - for (StringSetData update : updates) { - result = result.combine(update); - } + result = result.combine(updates); return result; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 19a4bd76c4b0a..1fad140717f6c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -24,11 +24,11 @@ import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; @@ -44,6 +44,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -190,7 +191,7 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) { if (metricUpdate.getSet() == null) { return StringSetResult.empty(); } - return StringSetResult.create(new HashSet<>(((ArrayList) metricUpdate.getSet()))); + return StringSetResult.create(ImmutableSet.copyOf(((Set) metricUpdate.getSet()))); } private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index b6b92103c90fa..9b8e3cc871daa 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -40,8 +40,7 @@ import com.google.api.services.dataflow.model.MetricUpdate; import java.io.IOException; import java.math.BigDecimal; -import java.util.Arrays; -import java.util.List; +import java.util.Set; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; import org.apache.beam.sdk.PipelineResult.State; @@ -190,7 +189,7 @@ private MetricUpdate makeCounterMetricUpdate( } private MetricUpdate makeStringSetMetricUpdate( - String name, String namespace, String step, List setValues, boolean tentative) { + String name, String namespace, String step, Set setValues, boolean tentative) { MetricUpdate update = new MetricUpdate(); update.setSet(setValues); return setStructuredName(update, name, namespace, step, tentative); @@ -258,10 +257,10 @@ public void testSingleStringSetUpdates() throws IOException { // the job metrics results. MetricUpdate mu1 = makeStringSetMetricUpdate( - "counterName", "counterNamespace", "s2", Arrays.asList("ab", "cd"), false); + "counterName", "counterNamespace", "s2", ImmutableSet.of("ab", "cd"), false); MetricUpdate mu1Tentative = makeStringSetMetricUpdate( - "counterName", "counterNamespace", "s2", Arrays.asList("ef", "gh"), true); + "counterName", "counterNamespace", "s2", ImmutableSet.of("ab", "cd"), true); jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative)); DataflowClient dataflowClient = mock(DataflowClient.class); when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); @@ -283,7 +282,7 @@ public void testSingleStringSetUpdates() throws IOException { "counterNamespace", "counterName", "myStepName", - StringSetResult.create(ImmutableSet.of("ef", "gh"))))); + StringSetResult.create(ImmutableSet.of("ab", "cd"))))); } @Test diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java index f2072067368f5..5e2d8e2029725 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java @@ -40,7 +40,10 @@ StringSetData getValue() { @Override public void add(String value) { - stringSetData.combine(StringSetData.create(ImmutableSet.of("ab"))); + if (stringSetData.stringSet().contains(value)) { + return; + } + stringSetData.combine(StringSetData.create(ImmutableSet.of(value))); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java index 80845090fd7d0..7cd12da3a5125 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java @@ -18,23 +18,37 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; -import java.util.Collections; import java.util.Set; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -/** The result of a {@link StringSet} metric. */ +/** The result of a {@link StringSet} metric. The {@link StringSetResult} hold an immutable copy + * of the set from which it was initially created representing that a result cannot be modified + * once created. + * */ @AutoValue public abstract class StringSetResult { public abstract Set getStringSet(); + /** + * Creates a {@link StringSetResult} from the given {@link Set} by first making a + * {@link ImmutableSet#copyOf(Object[])}. + * @param s the set from which the {@link StringSetResult} should be created. + * @return {@link StringSetResult} containing an immutable copy of the given set. + */ public static StringSetResult create(Set s) { - return new AutoValue_StringSetResult(s); + return new AutoValue_StringSetResult(ImmutableSet.copyOf(s)); } + /** + * @return a {@link EmptyStringSetResult} + */ public static StringSetResult empty() { return EmptyStringSetResult.INSTANCE; } - /** Empty {@link StringSetResult}, representing no values reported. */ + /** Empty {@link StringSetResult}, representing no values reported. Holds an + * {@link ImmutableSet} empty set. + * */ public static class EmptyStringSetResult extends StringSetResult { private static final EmptyStringSetResult INSTANCE = new EmptyStringSetResult(); @@ -44,7 +58,7 @@ private EmptyStringSetResult() {} /** Returns an empty immutable set. */ @Override public Set getStringSet() { - return Collections.emptySet(); + return ImmutableSet.of(); } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/StringSetResultTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/StringSetResultTest.java new file mode 100644 index 0000000000000..8783534fc979e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/StringSetResultTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import org.junit.Test; + +public class StringSetResultTest { + + @Test + public void getStringSet() { + // Test that getStringSet gives an immutable set + HashSet initialSet = new HashSet<>(Arrays.asList("ab", "cd")); + Set stringSetResultSet = StringSetResult.create(initialSet).getStringSet(); + assertEquals(initialSet, stringSetResultSet); + assertThrows(UnsupportedOperationException.class, () -> stringSetResultSet.add("should-fail")); + } + + @Test + public void create() { + // Test that create makes an immutable copy of the given set + HashSet modifiableSet = new HashSet<>(Arrays.asList("ab", "cd")); + StringSetResult stringSetResult = StringSetResult.create(modifiableSet); + // change the initial set. + modifiableSet.add("ef"); + SetView difference = Sets.difference(modifiableSet, stringSetResult.getStringSet()); + assertEquals(1, difference.size()); + assertEquals("ef", difference.iterator().next()); + assertTrue(Sets.difference(stringSetResult.getStringSet(), modifiableSet).isEmpty()); + } + + @Test + public void empty() { + // Test empty returns an immutable set + StringSetResult empptyStringSetResult = StringSetResult.empty(); + assertTrue(empptyStringSetResult.getStringSet().isEmpty()); + assertThrows(UnsupportedOperationException.class, () -> + empptyStringSetResult.getStringSet().add("should-fail")); + } +} \ No newline at end of file