diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle index 4af15a5e3103a..60301646f85b2 100644 --- a/runners/samza/build.gradle +++ b/runners/samza/build.gradle @@ -168,10 +168,6 @@ tasks.register("validatesRunner", Test) { excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedUnbounded' excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpointUnbounded' } - filter { - // Re-enable the test after Samza runner supports same state id across DoFn(s). - excludeTest('ParDoTest$StateTests', 'testValueStateSameId') - } } tasks.register("validatesRunnerSickbay", Test) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index a45448b305ecf..cb8df86058d4b 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer; @@ -33,6 +34,7 @@ import org.apache.beam.runners.samza.translation.SamzaPipelineTranslator; import org.apache.beam.runners.samza.translation.SamzaPortablePipelineTranslator; import org.apache.beam.runners.samza.translation.SamzaTransformOverrides; +import org.apache.beam.runners.samza.translation.StateIdParser; import org.apache.beam.runners.samza.translation.TranslationContext; import org.apache.beam.runners.samza.util.PipelineJsonRenderer; import org.apache.beam.sdk.Pipeline; @@ -140,9 +142,11 @@ public SamzaPipelineResult run(Pipeline pipeline) { LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); configBuilder.put(BEAM_DOT_GRAPH, dotGraph); configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); @@ -162,7 +166,7 @@ public SamzaPipelineResult run(Pipeline pipeline) { appDescriptor.withMetricsReporterFactories(reporterFactories); SamzaPipelineTranslator.translate( - pipeline, new TranslationContext(appDescriptor, idMap, options)); + pipeline, new TranslationContext(appDescriptor, idMap, nonUniqueStateIds, options)); }; // perform a final round of validation for the pipeline options now that all configs are diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index 735ec62cd3259..35661ae86fe19 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -127,6 +127,7 @@ public class DoFnOp implements Op { private final DoFnSchemaInformation doFnSchemaInformation; private final Map> sideInputMapping; + private final Map stateIdToStoreMapping; public DoFnOp( TupleTag mainOutputTag, @@ -148,7 +149,8 @@ public DoFnOp( JobInfo jobInfo, Map> idToTupleTagMap, DoFnSchemaInformation doFnSchemaInformation, - Map> sideInputMapping) { + Map> sideInputMapping, + Map stateIdToStoreMapping) { this.mainOutputTag = mainOutputTag; this.doFn = doFn; this.sideInputs = sideInputs; @@ -171,6 +173,7 @@ public DoFnOp( this.bundleStateId = "_samza_bundle_" + transformId; this.doFnSchemaInformation = doFnSchemaInformation; this.sideInputMapping = sideInputMapping; + this.stateIdToStoreMapping = stateIdToStoreMapping; } @Override @@ -260,6 +263,7 @@ public void open( outputCoders, doFnSchemaInformation, (Map>) sideInputMapping, + stateIdToStoreMapping, emitter, outputFutureCollector); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index ec1a9f3650908..41fe8190dec1a 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -52,8 +52,6 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -91,15 +89,19 @@ public static DoFnRunner create( Map, Coder> outputCoders, DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping, + Map stateIdToStoreIdMapping, OpEmitter emitter, FutureCollector futureCollector) { final KeyedInternals keyedInternals; final TimerInternals timerInternals; final StateInternals stateInternals; - final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); final SamzaStoreStateInternals.Factory stateInternalsFactory = SamzaStoreStateInternals.createStateInternalsFactory( - transformId, keyCoder, context.getTaskContext(), pipelineOptions, signature); + transformId, + keyCoder, + context.getTaskContext(), + pipelineOptions, + stateIdToStoreIdMapping); final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getApplicationContainerContext(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java index 7df48fb4cad5e..faa2d4addb25f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java @@ -26,14 +26,12 @@ import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -66,7 +64,6 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -129,18 +126,7 @@ static KeyValueStore> getBeamStore(TaskContext context) */ static Factory createNonKeyedStateInternalsFactory( String id, TaskContext context, SamzaPipelineOptions pipelineOptions) { - return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptySet()); - } - - static Factory createStateInternalsFactory( - String id, - Coder keyCoder, - TaskContext context, - SamzaPipelineOptions pipelineOptions, - DoFnSignature signature) { - - return createStateInternalsFactory( - id, keyCoder, context, pipelineOptions, signature.stateDeclarations().keySet()); + return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptyMap()); } static Factory createStateInternalsFactory( @@ -150,31 +136,35 @@ static Factory createStateInternalsFactory( SamzaPipelineOptions pipelineOptions, ExecutableStage executableStage) { - Set stateIds = + Map stateIdToStoreMap = executableStage.getUserStates().stream() - .map(UserStateReference::localName) - .collect(Collectors.toSet()); + .collect( + Collectors.toMap(UserStateReference::localName, UserStateReference::localName)); - return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds); + return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIdToStoreMap); } @SuppressWarnings("unchecked") - private static Factory createStateInternalsFactory( + static Factory createStateInternalsFactory( String id, @Nullable Coder keyCoder, TaskContext context, SamzaPipelineOptions pipelineOptions, - Collection stateIds) { + Map stateIdToStoreMap) { final int batchGetSize = pipelineOptions.getStoreBatchGetSize(); final Map>> stores = new HashMap<>(); stores.put(BEAM_STORE, getBeamStore(context)); final Coder stateKeyCoder; if (keyCoder != null) { - stateIds.forEach( - stateId -> - stores.put( - stateId, (KeyValueStore>) context.getStore(stateId))); + stateIdToStoreMap + .keySet() + .forEach( + stateId -> + stores.put( + stateId, + (KeyValueStore>) + context.getStore(stateIdToStoreMap.get(stateId)))); stateKeyCoder = keyCoder; } else { stateKeyCoder = (Coder) VoidCoder.of(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java index 50650ece96c40..c648957d1eb76 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java @@ -52,6 +52,7 @@ import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.runtime.RemoteApplicationRunner; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory; import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -306,9 +307,7 @@ private static Map createSystemConfig( static Map createRocksDBStoreConfig(SamzaPipelineOptions options) { final ImmutableMap.Builder configBuilder = ImmutableMap.builder() - .put( - BEAM_STORE_FACTORY, - "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory") + .put(BEAM_STORE_FACTORY, RocksDbKeyValueStorageEngineFactory.class.getName()) .put("stores.beamStore.rocksdb.compression", "lz4"); if (options.getStateDurable()) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java index d8feaff424d46..3fc405d15a5b8 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java @@ -17,10 +17,10 @@ */ package org.apache.beam.runners.samza.translation; -import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.beam.runners.samza.SamzaPipelineOptions; +import org.apache.beam.runners.samza.util.StoreIdGenerator; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; @@ -35,12 +35,13 @@ public class ConfigContext { private final Map idMap; private AppliedPTransform currentTransform; private final SamzaPipelineOptions options; - private final Set stateIds; + private final StoreIdGenerator storeIdGenerator; - public ConfigContext(Map idMap, SamzaPipelineOptions options) { + public ConfigContext( + Map idMap, Set nonUniqueStateIds, SamzaPipelineOptions options) { this.idMap = idMap; this.options = options; - this.stateIds = new HashSet<>(); + this.storeIdGenerator = new StoreIdGenerator(nonUniqueStateIds); } public void setCurrentTransform(AppliedPTransform currentTransform) { @@ -64,8 +65,8 @@ public SamzaPipelineOptions getPipelineOptions() { return this.options; } - public boolean addStateId(String stateId) { - return stateIds.add(stateId); + public StoreIdGenerator getStoreIdGenerator() { + return storeIdGenerator; } private String getIdForPValue(PValue pvalue) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java index 6db09f69d7d8f..ac7fe50042f36 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java @@ -72,6 +72,7 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory; import org.joda.time.Instant; /** @@ -161,6 +162,13 @@ private static void doTranslate( Map> sideInputMapping = ParDoTranslation.getSideInputMapping(ctx.getCurrentTransform()); + final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + final Map stateIdToStoreMapping = new HashMap<>(); + for (String stateId : signature.stateDeclarations().keySet()) { + final String transformFullName = node.getEnclosingNode().getFullName(); + final String storeId = ctx.getStoreIdGenerator().getId(stateId, transformFullName); + stateIdToStoreMapping.put(stateId, storeId); + } final DoFnOp op = new DoFnOp<>( transform.getMainOutputTag(), @@ -182,7 +190,8 @@ private static void doTranslate( null, Collections.emptyMap(), doFnSchemaInformation, - sideInputMapping); + sideInputMapping, + stateIdToStoreMapping); final MessageStream> mergedStreams; if (sideInputStreams.isEmpty()) { @@ -333,7 +342,8 @@ private static void doTranslatePortable( ctx.getJobInfo(), idToTupleTagMap, doFnSchemaInformation, - sideInputMapping); + sideInputMapping, + Collections.emptyMap()); final MessageStream> mergedStreams; if (sideInputStreams.isEmpty()) { @@ -376,18 +386,11 @@ public Map createConfig( if (signature.usesState()) { // set up user state configs - for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) { - final String storeId = state.id(); - - // TODO: remove validation after we support same state id in different ParDo. - if (!ctx.addStateId(storeId)) { - throw new IllegalStateException( - "Duplicate StateId " + storeId + " found in multiple ParDo."); - } - + for (String stateId : signature.stateDeclarations().keySet()) { + final String transformFullName = node.getEnclosingNode().getFullName(); + final String storeId = ctx.getStoreIdGenerator().getId(stateId, transformFullName); config.put( - "stores." + storeId + ".factory", - "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"); + "stores." + storeId + ".factory", RocksDbKeyValueStorageEngineFactory.class.getName()); config.put("stores." + storeId + ".key.serde", "byteArraySerde"); config.put("stores." + storeId + ".msg.serde", "stateValueSerde"); config.put("stores." + storeId + ".rocksdb.compression", "lz4"); @@ -430,8 +433,7 @@ public Map createPortableConfig( final String storeId = stateId.getLocalName(); config.put( - "stores." + storeId + ".factory", - "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"); + "stores." + storeId + ".factory", RocksDbKeyValueStorageEngineFactory.class.getName()); config.put("stores." + storeId + ".key.serde", "byteArraySerde"); config.put("stores." + storeId + ".msg.serde", "stateValueSerde"); config.put("stores." + storeId + ".rocksdb.compression", "lz4"); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java index 776ee80878d1e..2195489ca13fb 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java @@ -50,7 +50,7 @@ public class PortableTranslationContext extends TranslationContext { public PortableTranslationContext( StreamApplicationDescriptor appDescriptor, SamzaPipelineOptions options, JobInfo jobInfo) { - super(appDescriptor, Collections.emptyMap(), options); + super(appDescriptor, Collections.emptyMap(), Collections.emptySet(), options); this.jobInfo = jobInfo; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java index 65caa8ba68ea4..75409b07201da 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; import org.apache.beam.runners.core.construction.graph.ExecutableStage; @@ -78,8 +79,9 @@ public static void createConfig( Pipeline pipeline, SamzaPipelineOptions options, Map idMap, + Set nonUniqueStateIds, ConfigBuilder configBuilder) { - final ConfigContext ctx = new ConfigContext(idMap, options); + final ConfigContext ctx = new ConfigContext(idMap, nonUniqueStateIds, options); final TransformVisitorFn configFn = new TransformVisitorFn() { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/StateIdParser.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/StateIdParser.java new file mode 100644 index 0000000000000..05135a4d97dd6 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/StateIdParser.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.translation; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.runners.samza.util.StateUtils; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; + +/** + * This class identifies the set of non-unique state ids by scanning the BEAM {@link Pipeline} with + * a topological traversal. + */ +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class StateIdParser extends Pipeline.PipelineVisitor.Defaults { + private final Set nonUniqueStateIds = new HashSet<>(); + private final Set usedStateIds = new HashSet<>(); + + public static Set scan(Pipeline pipeline) { + final StateIdParser parser = new StateIdParser(); + pipeline.traverseTopologically(parser); + return parser.getNonUniqueStateIds(); + } + + private StateIdParser() {} + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + if (node.getTransform() instanceof ParDo.MultiOutput) { + final DoFn doFn = ((ParDo.MultiOutput) node.getTransform()).getFn(); + if (StateUtils.isStateful(doFn)) { + final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + for (String stateId : signature.stateDeclarations().keySet()) { + if (!usedStateIds.add(stateId)) { + nonUniqueStateIds.add(stateId); + } + } + } + } + } + + public Set getNonUniqueStateIds() { + return Collections.unmodifiableSet(nonUniqueStateIds); + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java index 8488594504922..a1cf67befe07f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.runtime.OpMessage; import org.apache.beam.runners.samza.util.HashIdGenerator; +import org.apache.beam.runners.samza.util.StoreIdGenerator; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -85,16 +86,19 @@ public class TranslationContext { private final Map registeredTables = new HashMap<>(); private final SamzaPipelineOptions options; private final HashIdGenerator idGenerator = new HashIdGenerator(); + private final StoreIdGenerator storeIdGenerator; private AppliedPTransform currentTransform; public TranslationContext( StreamApplicationDescriptor appDescriptor, Map idMap, + Set nonUniqueStateIds, SamzaPipelineOptions options) { this.appDescriptor = appDescriptor; this.idMap = idMap; this.options = options; + this.storeIdGenerator = new StoreIdGenerator(nonUniqueStateIds); } public void registerInputMessageStream( @@ -249,6 +253,10 @@ public String getTransformId() { return idGenerator.getId(getTransformFullName()); } + public StoreIdGenerator getStoreIdGenerator() { + return storeIdGenerator; + } + /** The dummy stream created will only be used in Beam tests. */ private static InputDescriptor, ?> createDummyStreamDescriptor(String id) { final GenericSystemDescriptor dummySystem = diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java index 88fdebc2d02a0..99f71f53860b8 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java @@ -46,8 +46,8 @@ public static WindowedValue.WindowedValueCoder instantiateCoder( /** * Escape the non-alphabet chars in the name so we can create a physical stream out of it. * - *

This escape will replace ".", "(" and "/" as "-", and then remove all the other - * non-alphabetic characters. + *

This escape will replace any non-alphanumeric characters other than "-" and "_" with "_" + * including whitespace. */ public static String escape(String name) { return name.replaceFirst(".*:([a-zA-Z#0-9]+).*", "$1").replaceAll("[^A-Za-z0-9_-]", "_"); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdGenerator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdGenerator.java new file mode 100644 index 0000000000000..aefba0186c8d4 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdGenerator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.util; + +import java.util.Set; + +/** + * This class encapsulates the logic to generate unique store id. For unique state ids across the + * Beam pipeline, store id is the same as the state id. For non-unique state ids, join the state id + * with an escaped transform name to generate a unique store id. + */ +public class StoreIdGenerator { + + private final Set nonUniqueStateIds; + + public StoreIdGenerator(Set nonUniqueStateId) { + this.nonUniqueStateIds = nonUniqueStateId; + } + + public String getId(String stateId, String transformFullName) { + String storeId = stateId; + if (nonUniqueStateIds.contains(stateId)) { + final String escapedName = SamzaPipelineTranslatorUtils.escape(transformFullName); + storeId = toUniqueStoreId(stateId, escapedName); + } + return storeId; + } + + /** Join state id and escaped PTransform name to uniquify store id. */ + private static String toUniqueStoreId(String stateId, String escapedPTransformName) { + return String.join("-", stateId, escapedPTransformName); + } +} diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java index 457a43e53e470..97d19a7c73398 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -58,6 +59,7 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; @@ -195,6 +197,106 @@ public void processElement( pipeline.run(); } + @Test + public void testValueStateSameIdAcrossParDo() { + final String stateId = "foo"; + + DoFn, KV> fn = + new DoFn, KV>() { + + @StateId(stateId) + private final StateSpec> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @StateId(stateId) ValueState state, OutputReceiver> r) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + r.output(KV.of("sizzle", currentValue)); + state.write(currentValue + 1); + } + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + + @StateId(stateId) + private final StateSpec> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @StateId(stateId) ValueState state, OutputReceiver r) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 13); + r.output(currentValue); + state.write(currentValue + 13); + } + }; + + PCollection> intermediate = + pipeline + .apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) + .apply("First stateful ParDo", ParDo.of(fn)); + + PCollection output = intermediate.apply("Second stateful ParDo", ParDo.of(fn2)); + + PAssert.that(intermediate) + .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2)); + PAssert.that(output).containsInAnyOrder(13, 26, 39); + pipeline.run(); + } + + @Test + public void testValueStateSameIdAcrossParDoWithSameName() { + final String stateId = "foo"; + + DoFn, KV> fn = + new DoFn, KV>() { + + @StateId(stateId) + private final StateSpec> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @StateId(stateId) ValueState state, OutputReceiver> r) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + r.output(KV.of("hello", currentValue)); + state.write(currentValue + 1); + } + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + + @StateId(stateId) + private final StateSpec> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @StateId(stateId) ValueState state, OutputReceiver r) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 13); + r.output(currentValue); + state.write(currentValue + 13); + } + }; + + PCollection> intermediate = + pipeline + .apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) + .apply("Stateful ParDo with Same Name", ParDo.of(fn)); + + PCollection output = + intermediate.apply("Stateful ParDo with Same Name", ParDo.of(fn2)); + + PAssert.that(intermediate) + .containsInAnyOrder(KV.of("hello", 0), KV.of("hello", 1), KV.of("hello", 2)); + + PAssert.that(output).containsInAnyOrder(13, 26, 39); + pipeline.run(); + } + /** A storage engine to create test stores. */ public static class TestStorageEngine extends InMemoryKeyValueStorageEngineFactory { diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java index bb39c186a9f25..9c6c00e86b88d 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java @@ -19,11 +19,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.beam.runners.samza.SamzaExecutionEnvironment; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.SamzaRunner; @@ -74,8 +74,10 @@ public void testStatefulBeamStoreConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -86,7 +88,8 @@ public void testStatefulBeamStoreConfig() { assertNull(config.get("stores.beamStore.changelog")); options.setStateDurable(true); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-beamStore-changelog", config2.get("stores.beamStore.changelog")); @@ -104,8 +107,10 @@ public void testStatelessBeamStoreConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -116,7 +121,8 @@ public void testStatelessBeamStoreConfig() { assertNull(config.get("stores.beamStore.changelog")); options.setStateDurable(true); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config2 = configBuilder.build(); // For stateless jobs, ignore state durable pipeline option. assertNull(config2.get("stores.beamStore.changelog")); @@ -135,8 +141,10 @@ public void testSamzaLocalExecutionEnvironmentConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertTrue( @@ -163,8 +171,10 @@ public void testSamzaYarnExecutionEnvironmentConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); try { Config config = configBuilder.build(); assertEquals(config.get(APP_RUNNER_CLASS), RemoteApplicationRunner.class.getName()); @@ -193,8 +203,10 @@ public void testSamzaStandAloneExecutionEnvironmentConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); try { Config config = configBuilder.build(); assertEquals(config.get(APP_RUNNER_CLASS), LocalApplicationRunner.class.getName()); @@ -234,8 +246,10 @@ public void processElement( })); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -246,14 +260,15 @@ public void processElement( assertNull(config.get("stores.testState.changelog")); options.setStateDurable(true); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-testState-changelog", config2.get("stores.testState.changelog")); } @Test - public void testDuplicateStateIdConfig() { + public void testUserStoreConfigSameStateIdAcrossParDo() { SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); options.setJobName("TestStoreConfig"); options.setRunner(SamzaRunner.class); @@ -263,6 +278,7 @@ public void testDuplicateStateIdConfig() { .apply( Create.empty(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))) .apply( + "First stateful ParDo", ParDo.of( new DoFn, KV>() { private static final String testState = "testState"; @@ -277,6 +293,7 @@ public void processElement( } })) .apply( + "Second stateful ParDo", ParDo.of( new DoFn, Void>() { private static final String testState = "testState"; @@ -290,10 +307,111 @@ public void processElement( })); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); + final Config config = configBuilder.build(); + + assertEquals( + RocksDbKeyValueStorageEngineFactory.class.getName(), + config.get("stores.testState-First_stateful_ParDo.factory")); + assertEquals("byteArraySerde", config.get("stores.testState-First_stateful_ParDo.key.serde")); + assertEquals("stateValueSerde", config.get("stores.testState-First_stateful_ParDo.msg.serde")); + assertNull(config.get("stores.testState-First_stateful_ParDo.changelog")); + + assertEquals( + RocksDbKeyValueStorageEngineFactory.class.getName(), + config.get("stores.testState-Second_stateful_ParDo.factory")); + assertEquals("byteArraySerde", config.get("stores.testState-Second_stateful_ParDo.key.serde")); + assertEquals("stateValueSerde", config.get("stores.testState-Second_stateful_ParDo.msg.serde")); + assertNull(config.get("stores.testState-Second_stateful_ParDo.changelog")); + + options.setStateDurable(true); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); + final Config config2 = configBuilder.build(); + assertEquals( + "TestStoreConfig-1-testState-First_stateful_ParDo-changelog", + config2.get("stores.testState-First_stateful_ParDo.changelog")); + assertEquals( + "TestStoreConfig-1-testState-Second_stateful_ParDo-changelog", + config2.get("stores.testState-Second_stateful_ParDo.changelog")); + } + + @Test + public void testUserStoreConfigSameStateIdAndPTransformName() { + SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); + options.setJobName("TestStoreConfig"); + options.setRunner(SamzaRunner.class); - assertThrows( - IllegalStateException.class, - () -> SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder)); + Pipeline pipeline = Pipeline.create(options); + pipeline + .apply( + Create.empty(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))) + .apply( + "Same stateful ParDo Name", + ParDo.of( + new DoFn, KV>() { + private static final String testState = "testState"; + + @StateId(testState) + private final StateSpec> state = StateSpecs.value(); + + @ProcessElement + public void processElement( + ProcessContext context, @StateId(testState) ValueState state) { + context.output(context.element()); + } + })) + .apply( + "Same stateful ParDo Name", + ParDo.of( + new DoFn, Void>() { + private static final String testState = "testState"; + + @StateId(testState) + private final StateSpec> state = StateSpecs.value(); + + @ProcessElement + public void processElement( + ProcessContext context, @StateId(testState) ValueState state) {} + })); + + final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); + final ConfigBuilder configBuilder = new ConfigBuilder(options); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); + final Config config = configBuilder.build(); + + assertEquals( + RocksDbKeyValueStorageEngineFactory.class.getName(), + config.get("stores.testState-Same_stateful_ParDo_Name.factory")); + assertEquals( + "byteArraySerde", config.get("stores.testState-Same_stateful_ParDo_Name.key.serde")); + assertEquals( + "stateValueSerde", config.get("stores.testState-Same_stateful_ParDo_Name.msg.serde")); + assertNull(config.get("stores.testState-Same_stateful_ParDo_Name.changelog")); + + assertEquals( + RocksDbKeyValueStorageEngineFactory.class.getName(), + config.get("stores.testState-Same_stateful_ParDo_Name2.factory")); + assertEquals( + "byteArraySerde", config.get("stores.testState-Same_stateful_ParDo_Name2.key.serde")); + assertEquals( + "stateValueSerde", config.get("stores.testState-Same_stateful_ParDo_Name2.msg.serde")); + assertNull(config.get("stores.testState-Same_stateful_ParDo_Name2.changelog")); + + options.setStateDurable(true); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); + final Config config2 = configBuilder.build(); + assertEquals( + "TestStoreConfig-1-testState-Same_stateful_ParDo_Name-changelog", + config2.get("stores.testState-Same_stateful_ParDo_Name.changelog")); + assertEquals( + "TestStoreConfig-1-testState-Same_stateful_ParDo_Name2-changelog", + config2.get("stores.testState-Same_stateful_ParDo_Name2.changelog")); } } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java index 8827f1e5622c0..bb357dd6aced8 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java @@ -22,8 +22,10 @@ import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.runtime.OpMessage; @@ -60,8 +62,10 @@ public class TranslationContextTest { }, getConfig()); Map idMap = new HashMap<>(); + Set nonUniqueStateIds = new HashSet<>(); TranslationContext translationContext = - new TranslationContext(streamApplicationDescriptor, idMap, mock(SamzaPipelineOptions.class)); + new TranslationContext( + streamApplicationDescriptor, idMap, nonUniqueStateIds, mock(SamzaPipelineOptions.class)); @Test public void testRegisterInputMessageStreams() {