From 2d49a0c54338eaed52af1e30c8abd750c824fe9f Mon Sep 17 00:00:00 2001 From: Seung Jin An Date: Tue, 1 Nov 2022 13:17:27 -0700 Subject: [PATCH 1/5] - Support same stateId with multiple ParDo within a single pipeline for Samza Runner - Add unit tests - Re-enable ParDoTest with the support on same staste id across DoFns --- runners/samza/build.gradle | 4 - .../beam/runners/samza/SamzaRunner.java | 18 ++- .../beam/runners/samza/runtime/DoFnOp.java | 8 +- .../runners/samza/runtime/GroupByKeyOp.java | 1 + .../samza/runtime/SamzaDoFnRunners.java | 8 +- .../runtime/SamzaStoreStateInternals.java | 54 +++---- ...SplittableParDoProcessKeyedElementsOp.java | 1 + .../samza/translation/ConfigBuilder.java | 4 + .../samza/translation/ConfigContext.java | 47 +++++- .../ParDoBoundMultiTranslator.java | 42 +++-- .../PortableTranslationContext.java | 2 +- .../translation/SamzaPipelineTranslator.java | 35 ++++- .../samza/translation/TranslationContext.java | 16 ++ .../util/SamzaPipelineTranslatorUtils.java | 4 +- .../runtime/SamzaStoreStateInternalsTest.java | 102 +++++++++++++ .../translation/ConfigGeneratorTest.java | 143 ++++++++++++++++-- .../translation/TranslationContextTest.java | 19 ++- 17 files changed, 424 insertions(+), 84 deletions(-) diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle index 4af15a5e3103a..60301646f85b2 100644 --- a/runners/samza/build.gradle +++ b/runners/samza/build.gradle @@ -168,10 +168,6 @@ tasks.register("validatesRunner", Test) { excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedUnbounded' excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpointUnbounded' } - filter { - // Re-enable the test after Samza runner supports same state id across DoFn(s). - excludeTest('ParDoTest$StateTests', 'testValueStateSameId') - } } tasks.register("validatesRunnerSickbay", Test) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index a45448b305ecf..0a014c7c6d871 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer; @@ -140,9 +141,21 @@ public SamzaPipelineResult run(Pipeline pipeline) { LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + /* Map of stateId to sanitized (remove whitespace and replace '-' with '_') PTransform name, used in multiple ParDos + (eg) @StateId("foo") used in two ParDos: + .apply("First Stateful ParDo with same stateId", ParDo.of(fn)) + .apply("Second Stateful ParDo with same stateId", ParDo.of(fn2)) + Map = ("foo", "First_Stateful_ParDo_with_same_stateId") + This will be populated with the key of stateId and value as the sanitized ParDo name. This map will be used to + identify the stateId used in multiple ParDos and rewrite RocksDB configs. */ + final Map multiParDoStateIdMap = new HashMap<>(); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, multiParDoStateIdMap, configBuilder); + SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( + options, multiParDoStateIdMap, configBuilder); + configBuilder.put(BEAM_DOT_GRAPH, dotGraph); configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); @@ -155,6 +168,7 @@ public SamzaPipelineResult run(Pipeline pipeline) { final SamzaExecutionContext executionContext = new SamzaExecutionContext(options); final Map reporterFactories = getMetricsReporters(); + final Set multiParDoStateIds = multiParDoStateIdMap.keySet(); final StreamApplication app = appDescriptor -> { @@ -162,7 +176,7 @@ public SamzaPipelineResult run(Pipeline pipeline) { appDescriptor.withMetricsReporterFactories(reporterFactories); SamzaPipelineTranslator.translate( - pipeline, new TranslationContext(appDescriptor, idMap, options)); + pipeline, new TranslationContext(appDescriptor, idMap, multiParDoStateIds, options)); }; // perform a final round of validation for the pipeline options now that all configs are diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index 735ec62cd3259..c1dc4972e7203 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -127,6 +127,7 @@ public class DoFnOp implements Op { private final DoFnSchemaInformation doFnSchemaInformation; private final Map> sideInputMapping; + private final Map userStateIds; public DoFnOp( TupleTag mainOutputTag, @@ -148,7 +149,8 @@ public DoFnOp( JobInfo jobInfo, Map> idToTupleTagMap, DoFnSchemaInformation doFnSchemaInformation, - Map> sideInputMapping) { + Map> sideInputMapping, + Map userStateIds) { this.mainOutputTag = mainOutputTag; this.doFn = doFn; this.sideInputs = sideInputs; @@ -171,6 +173,7 @@ public DoFnOp( this.bundleStateId = "_samza_bundle_" + transformId; this.doFnSchemaInformation = doFnSchemaInformation; this.sideInputMapping = sideInputMapping; + this.userStateIds = userStateIds; } @Override @@ -261,7 +264,8 @@ public void open( doFnSchemaInformation, (Map>) sideInputMapping, emitter, - outputFutureCollector); + outputFutureCollector, + userStateIds); } this.pushbackFnRunner = diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java index 3ecd406da615f..4c2c5ce2cd0ae 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java @@ -126,6 +126,7 @@ public void open( Collections.singletonMap( SamzaStoreStateInternals.BEAM_STORE, SamzaStoreStateInternals.getBeamStore(context.getTaskContext())), + Collections.emptyMap(), keyCoder, pipelineOptions.getStoreBatchGetSize()); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index ec1a9f3650908..4db0a3eebec0e 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -52,8 +52,6 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -92,14 +90,14 @@ public static DoFnRunner create( DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping, OpEmitter emitter, - FutureCollector futureCollector) { + FutureCollector futureCollector, + Map userStateIds) { final KeyedInternals keyedInternals; final TimerInternals timerInternals; final StateInternals stateInternals; - final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); final SamzaStoreStateInternals.Factory stateInternalsFactory = SamzaStoreStateInternals.createStateInternalsFactory( - transformId, keyCoder, context.getTaskContext(), pipelineOptions, signature); + transformId, keyCoder, context.getTaskContext(), pipelineOptions, userStateIds); final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getApplicationContainerContext(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java index 7df48fb4cad5e..322da4babd33a 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java @@ -26,14 +26,12 @@ import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -66,7 +64,6 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -99,6 +96,8 @@ public class SamzaStoreStateInternals implements StateInternals { // the stores include both beamStore for system states as well as stores for user state private final Map>> stores; + // Map of non-system stateIds to unique storeId + private final Map stateIdMap; private final K key; private final byte[] keyBytes; private final int batchGetSize; @@ -106,6 +105,7 @@ public class SamzaStoreStateInternals implements StateInternals { private SamzaStoreStateInternals( Map>> stores, + Map stateIds, @Nullable K key, byte @Nullable [] keyBytes, String stageId, @@ -115,6 +115,7 @@ private SamzaStoreStateInternals( this.keyBytes = keyBytes; this.batchGetSize = batchGetSize; this.stageId = stageId; + this.stateIdMap = stateIds; } @SuppressWarnings("unchecked") @@ -129,18 +130,7 @@ static KeyValueStore> getBeamStore(TaskContext context) */ static Factory createNonKeyedStateInternalsFactory( String id, TaskContext context, SamzaPipelineOptions pipelineOptions) { - return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptySet()); - } - - static Factory createStateInternalsFactory( - String id, - Coder keyCoder, - TaskContext context, - SamzaPipelineOptions pipelineOptions, - DoFnSignature signature) { - - return createStateInternalsFactory( - id, keyCoder, context, pipelineOptions, signature.stateDeclarations().keySet()); + return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptyMap()); } static Factory createStateInternalsFactory( @@ -149,37 +139,39 @@ static Factory createStateInternalsFactory( TaskContext context, SamzaPipelineOptions pipelineOptions, ExecutableStage executableStage) { - - Set stateIds = + // TODO: handle same stateIds in multiple ParDos for portable mode + Map stateIds = executableStage.getUserStates().stream() .map(UserStateReference::localName) - .collect(Collectors.toSet()); + .collect(Collectors.toMap(Function.identity(), Function.identity())); return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds); } @SuppressWarnings("unchecked") - private static Factory createStateInternalsFactory( + static Factory createStateInternalsFactory( String id, @Nullable Coder keyCoder, TaskContext context, SamzaPipelineOptions pipelineOptions, - Collection stateIds) { + Map stateIdMap) { final int batchGetSize = pipelineOptions.getStoreBatchGetSize(); final Map>> stores = new HashMap<>(); stores.put(BEAM_STORE, getBeamStore(context)); - final Coder stateKeyCoder; if (keyCoder != null) { - stateIds.forEach( - stateId -> - stores.put( - stateId, (KeyValueStore>) context.getStore(stateId))); + stateIdMap + .values() + .forEach( + storeId -> + stores.put( + storeId, + (KeyValueStore>) context.getStore(storeId))); stateKeyCoder = keyCoder; } else { stateKeyCoder = (Coder) VoidCoder.of(); } - return new Factory<>(Objects.toString(id), stores, stateKeyCoder, batchGetSize); + return new Factory<>(Objects.toString(id), stores, stateIdMap, stateKeyCoder, batchGetSize); } @Override @@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos() { public static class Factory implements StateInternalsFactory { private final String stageId; private final Map>> stores; + private final Map stateIdMap; private final Coder keyCoder; private final int batchGetSize; public Factory( String stageId, Map>> stores, + Map stateIdMap, Coder keyCoder, int batchGetSize) { this.stageId = stageId; this.stores = stores; + this.stateIdMap = stateIdMap; this.keyCoder = keyCoder; this.batchGetSize = batchGetSize; } @@ -303,7 +298,8 @@ public StateInternals stateInternalsForKey(@Nullable K key) { throw new RuntimeException("Cannot encode key for state store", e); } - return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(), stageId, batchGetSize); + return new SamzaStoreStateInternals<>( + stores, stateIdMap, key, baos.toByteArray(), stageId, batchGetSize); } } @@ -323,11 +319,11 @@ protected AbstractSamzaState( this.coder = coder; this.namespace = namespace; this.addressId = address.getId(); - this.isBeamStore = !stores.containsKey(address.getId()); + this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId())); this.store = isBeamStore ? (KeyValueStore) stores.get(BEAM_STORE) - : (KeyValueStore) stores.get(address.getId()); + : (KeyValueStore) stores.get(stateIdMap.get(address.getId())); this.stageId = SamzaStoreStateInternals.this.stageId; this.keyBytes = SamzaStoreStateInternals.this.keyBytes; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java index cf164b145b630..c25c205dae2a6 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java @@ -124,6 +124,7 @@ public void open( Collections.singletonMap( SamzaStoreStateInternals.BEAM_STORE, SamzaStoreStateInternals.getBeamStore(context.getTaskContext())), + Collections.emptyMap(), ByteArrayCoder.of(), pipelineOptions.getStoreBatchGetSize()); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java index 50650ece96c40..d06fbd6883d2a 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java @@ -83,6 +83,10 @@ public void putAll(Map properties) { config.putAll(properties); } + public void remove(String name) { + config.remove(name); + } + /** @return built configuration */ public Config build() { try { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java index d8feaff424d46..dfcdb420df3c1 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java @@ -17,9 +17,8 @@ */ package org.apache.beam.runners.samza.translation; -import java.util.HashSet; +import java.util.HashMap; import java.util.Map; -import java.util.Set; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -35,12 +34,17 @@ public class ConfigContext { private final Map idMap; private AppliedPTransform currentTransform; private final SamzaPipelineOptions options; - private final Set stateIds; + private final Map usedStateIdMap; + private final Map multiParDoStateIdMap; - public ConfigContext(Map idMap, SamzaPipelineOptions options) { + public ConfigContext( + Map idMap, + SamzaPipelineOptions options, + Map multiParDoStateIdMap) { this.idMap = idMap; this.options = options; - this.stateIds = new HashSet<>(); + this.usedStateIdMap = new HashMap<>(); + this.multiParDoStateIdMap = multiParDoStateIdMap; } public void setCurrentTransform(AppliedPTransform currentTransform) { @@ -64,8 +68,37 @@ public SamzaPipelineOptions getPipelineOptions() { return this.options; } - public boolean addStateId(String stateId) { - return stateIds.add(stateId); + /** Helper to keep track of used stateIds and return unique store id. */ + public String getUniqueStoreId(String stateId, String parDoName) { + // Update a map of used state id with parDo name. + if (!usedStateIdMap.containsKey(stateId)) { + usedStateIdMap.put(stateId, parDoName); + return stateId; + } else { + // Same state id identified for the first time + if (!multiParDoStateIdMap.containsKey(stateId)) { + final String prevParDoName = usedStateIdMap.get(stateId); + final String prevMultiParDoStateId = String.join("-", stateId, prevParDoName); + usedStateIdMap.put(prevMultiParDoStateId, prevParDoName); + // Store the stateId with previous parDo name which will be used for config rewriting + multiParDoStateIdMap.put(stateId, prevParDoName); + } + // Compose a new store id with state id and parDo name (eg) "stateId-parDoName" + final String multiParDoStateId = String.join("-", stateId, parDoName); + // Leveraging framework which enforces unique parDo name. + // If the framework logic changes, this is a safeguard to throw exception to avoid storeId + // collision + if (usedStateIdMap.containsKey(multiParDoStateId)) { + throw new IllegalStateException( + "Same stateId " + + stateId + + " with the same parDoName " + + parDoName + + " found in multiple ParDo."); + } + usedStateIdMap.put(multiParDoStateId, parDoName); + return multiParDoStateId; + } } private String getIdForPValue(PValue pvalue) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java index 6db09f69d7d8f..d5bc828f50f68 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -161,6 +162,16 @@ private static void doTranslate( Map> sideInputMapping = ParDoTranslation.getSideInputMapping(ctx.getCurrentTransform()); + final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + final Map userStateIdToStoreIdMap; + if (DoFnSignatures.isStateful(transform.getFn())) { + final Set stateIds = signature.stateDeclarations().keySet(); + final String escapedParDoName = + SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName()); + userStateIdToStoreIdMap = ctx.getStateIdToStoreIdMap(stateIds, escapedParDoName); + } else { + userStateIdToStoreIdMap = Collections.emptyMap(); + } final DoFnOp op = new DoFnOp<>( transform.getMainOutputTag(), @@ -182,7 +193,8 @@ private static void doTranslate( null, Collections.emptyMap(), doFnSchemaInformation, - sideInputMapping); + sideInputMapping, + userStateIdToStoreIdMap); final MessageStream> mergedStreams; if (sideInputStreams.isEmpty()) { @@ -333,7 +345,9 @@ private static void doTranslatePortable( ctx.getJobInfo(), idToTupleTagMap, doFnSchemaInformation, - sideInputMapping); + sideInputMapping, + // TODO: populate this for portable + Collections.emptyMap()); final MessageStream> mergedStreams; if (sideInputStreams.isEmpty()) { @@ -377,25 +391,21 @@ public Map createConfig( if (signature.usesState()) { // set up user state configs for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) { - final String storeId = state.id(); - - // TODO: remove validation after we support same state id in different ParDo. - if (!ctx.addStateId(storeId)) { - throw new IllegalStateException( - "Duplicate StateId " + storeId + " found in multiple ParDo."); - } - + final String userStateId = state.id(); + final String escapedParDoName = + SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName()); + final String uniqueStoreId = ctx.getUniqueStoreId(userStateId, escapedParDoName); config.put( - "stores." + storeId + ".factory", + "stores." + uniqueStoreId + ".factory", "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"); - config.put("stores." + storeId + ".key.serde", "byteArraySerde"); - config.put("stores." + storeId + ".msg.serde", "stateValueSerde"); - config.put("stores." + storeId + ".rocksdb.compression", "lz4"); + config.put("stores." + uniqueStoreId + ".key.serde", "byteArraySerde"); + config.put("stores." + uniqueStoreId + ".msg.serde", "stateValueSerde"); + config.put("stores." + uniqueStoreId + ".rocksdb.compression", "lz4"); if (options.getStateDurable()) { config.put( - "stores." + storeId + ".changelog", - ConfigBuilder.getChangelogTopic(options, storeId)); + "stores." + uniqueStoreId + ".changelog", + ConfigBuilder.getChangelogTopic(options, uniqueStoreId)); } } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java index 776ee80878d1e..2195489ca13fb 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java @@ -50,7 +50,7 @@ public class PortableTranslationContext extends TranslationContext { public PortableTranslationContext( StreamApplicationDescriptor appDescriptor, SamzaPipelineOptions options, JobInfo jobInfo) { - super(appDescriptor, Collections.emptyMap(), options); + super(appDescriptor, Collections.emptyMap(), Collections.emptySet(), options); this.jobInfo = jobInfo; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java index 65caa8ba68ea4..dd3eb93487345 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java @@ -78,8 +78,9 @@ public static void createConfig( Pipeline pipeline, SamzaPipelineOptions options, Map idMap, + Map multiParDoStateIdMap, ConfigBuilder configBuilder) { - final ConfigContext ctx = new ConfigContext(idMap, options); + final ConfigContext ctx = new ConfigContext(idMap, options, multiParDoStateIdMap); final TransformVisitorFn configFn = new TransformVisitorFn() { @@ -105,6 +106,38 @@ public static void createConfig( pipeline.traverseTopologically(visitor); } + /** Rewrite user store configs if there exists same state ids with multiple ParDos. */ + public static void rewriteConfigWithMultiParDoStateId( + SamzaPipelineOptions options, + Map multiParDoStateIdMap, + ConfigBuilder configBuilder) { + multiParDoStateIdMap.forEach( + (stateId, value) -> { + // rewrite single parDo state configs into multiple parDo state + String multiParDoStateId = String.join("-", stateId, value); + // replace old single parDo store configs with new storeId mapping appended with parDo + // name + configBuilder.remove("stores." + stateId + ".factory"); + configBuilder.remove("stores." + stateId + ".key.serde"); + configBuilder.remove("stores." + stateId + ".msg.serde"); + configBuilder.remove("stores." + stateId + ".rocksdb.compression"); + // put new config with multi pardo config + configBuilder.put( + "stores." + multiParDoStateId + ".factory", + "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"); + configBuilder.put("stores." + multiParDoStateId + ".key.serde", "byteArraySerde"); + configBuilder.put("stores." + multiParDoStateId + ".msg.serde", "stateValueSerde"); + configBuilder.put("stores." + multiParDoStateId + ".rocksdb.compression", "lz4"); + + if (options.getStateDurable()) { + configBuilder.remove("stores." + stateId + ".changelog"); + configBuilder.put( + "stores." + multiParDoStateId + ".changelog", + ConfigBuilder.getChangelogTopic(options, multiParDoStateId)); + } + }); + } + private interface TransformVisitorFn { > void apply( T transform, diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java index 8488594504922..da7fbdc0ec4e4 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java @@ -83,6 +83,7 @@ public class TranslationContext { private final Map idMap; private final Map registeredInputStreams = new HashMap<>(); private final Map registeredTables = new HashMap<>(); + private final Set multiParDoStateIds; private final SamzaPipelineOptions options; private final HashIdGenerator idGenerator = new HashIdGenerator(); @@ -91,9 +92,11 @@ public class TranslationContext { public TranslationContext( StreamApplicationDescriptor appDescriptor, Map idMap, + Set multiParDoStateIds, SamzaPipelineOptions options) { this.appDescriptor = appDescriptor; this.idMap = idMap; + this.multiParDoStateIds = multiParDoStateIds; this.options = options; } @@ -249,6 +252,19 @@ public String getTransformId() { return idGenerator.getId(getTransformFullName()); } + /** Given a set of user stateIds and parDo name, return a stateId to storeId map. */ + public Map getStateIdToStoreIdMap(Set stateIds, String escapedParDoName) { + final Map storeIds = new HashMap<>(); + stateIds.forEach( + stateId -> + storeIds.put( + stateId, + multiParDoStateIds.contains(stateId) + ? String.join("-", stateId, escapedParDoName) + : stateId)); + return storeIds; + } + /** The dummy stream created will only be used in Beam tests. */ private static InputDescriptor, ?> createDummyStreamDescriptor(String id) { final GenericSystemDescriptor dummySystem = diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java index 88fdebc2d02a0..99f71f53860b8 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java @@ -46,8 +46,8 @@ public static WindowedValue.WindowedValueCoder instantiateCoder( /** * Escape the non-alphabet chars in the name so we can create a physical stream out of it. * - *

This escape will replace ".", "(" and "/" as "-", and then remove all the other - * non-alphabetic characters. + *

This escape will replace any non-alphanumeric characters other than "-" and "_" with "_" + * including whitespace. */ public static String escape(String name) { return name.replaceFirst(".*:([a-zA-Z#0-9]+).*", "$1").replaceAll("[^A-Za-z0-9_-]", "_"); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java index 457a43e53e470..97d19a7c73398 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -58,6 +59,7 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; @@ -195,6 +197,106 @@ public void processElement( pipeline.run(); } + @Test + public void testValueStateSameIdAcrossParDo() { + final String stateId = "foo"; + + DoFn, KV> fn = + new DoFn, KV>() { + + @StateId(stateId) + private final StateSpec> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @StateId(stateId) ValueState state, OutputReceiver> r) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + r.output(KV.of("sizzle", currentValue)); + state.write(currentValue + 1); + } + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + + @StateId(stateId) + private final StateSpec> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @StateId(stateId) ValueState state, OutputReceiver r) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 13); + r.output(currentValue); + state.write(currentValue + 13); + } + }; + + PCollection> intermediate = + pipeline + .apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) + .apply("First stateful ParDo", ParDo.of(fn)); + + PCollection output = intermediate.apply("Second stateful ParDo", ParDo.of(fn2)); + + PAssert.that(intermediate) + .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2)); + PAssert.that(output).containsInAnyOrder(13, 26, 39); + pipeline.run(); + } + + @Test + public void testValueStateSameIdAcrossParDoWithSameName() { + final String stateId = "foo"; + + DoFn, KV> fn = + new DoFn, KV>() { + + @StateId(stateId) + private final StateSpec> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @StateId(stateId) ValueState state, OutputReceiver> r) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + r.output(KV.of("hello", currentValue)); + state.write(currentValue + 1); + } + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + + @StateId(stateId) + private final StateSpec> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @StateId(stateId) ValueState state, OutputReceiver r) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 13); + r.output(currentValue); + state.write(currentValue + 13); + } + }; + + PCollection> intermediate = + pipeline + .apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) + .apply("Stateful ParDo with Same Name", ParDo.of(fn)); + + PCollection output = + intermediate.apply("Stateful ParDo with Same Name", ParDo.of(fn2)); + + PAssert.that(intermediate) + .containsInAnyOrder(KV.of("hello", 0), KV.of("hello", 1), KV.of("hello", 2)); + + PAssert.that(output).containsInAnyOrder(13, 26, 39); + pipeline.run(); + } + /** A storage engine to create test stores. */ public static class TestStorageEngine extends InMemoryKeyValueStorageEngineFactory { diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java index bb39c186a9f25..801e84c825a1c 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java @@ -19,9 +19,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import org.apache.beam.runners.samza.SamzaExecutionEnvironment; @@ -74,8 +74,10 @@ public void testStatefulBeamStoreConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Map multiParDoStateIdMap = new HashMap<>(); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, multiParDoStateIdMap, configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -86,7 +88,7 @@ public void testStatefulBeamStoreConfig() { assertNull(config.get("stores.beamStore.changelog")); options.setStateDurable(true); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); final Config config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-beamStore-changelog", config2.get("stores.beamStore.changelog")); @@ -105,7 +107,7 @@ public void testStatelessBeamStoreConfig() { final Map idMap = PViewToIdMapper.buildIdMap(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -116,7 +118,7 @@ public void testStatelessBeamStoreConfig() { assertNull(config.get("stores.beamStore.changelog")); options.setStateDurable(true); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); final Config config2 = configBuilder.build(); // For stateless jobs, ignore state durable pipeline option. assertNull(config2.get("stores.beamStore.changelog")); @@ -136,7 +138,7 @@ public void testSamzaLocalExecutionEnvironmentConfig() { final Map idMap = PViewToIdMapper.buildIdMap(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); final Config config = configBuilder.build(); assertTrue( @@ -164,7 +166,7 @@ public void testSamzaYarnExecutionEnvironmentConfig() { final Map idMap = PViewToIdMapper.buildIdMap(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); try { Config config = configBuilder.build(); assertEquals(config.get(APP_RUNNER_CLASS), RemoteApplicationRunner.class.getName()); @@ -194,7 +196,7 @@ public void testSamzaStandAloneExecutionEnvironmentConfig() { final Map idMap = PViewToIdMapper.buildIdMap(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); try { Config config = configBuilder.build(); assertEquals(config.get(APP_RUNNER_CLASS), LocalApplicationRunner.class.getName()); @@ -235,7 +237,7 @@ public void processElement( final Map idMap = PViewToIdMapper.buildIdMap(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -246,14 +248,14 @@ public void processElement( assertNull(config.get("stores.testState.changelog")); options.setStateDurable(true); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); final Config config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-testState-changelog", config2.get("stores.testState.changelog")); } @Test - public void testDuplicateStateIdConfig() { + public void testUserStoreConfigSameStateIdAcrossParDo() { SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); options.setJobName("TestStoreConfig"); options.setRunner(SamzaRunner.class); @@ -263,6 +265,7 @@ public void testDuplicateStateIdConfig() { .apply( Create.empty(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))) .apply( + "First stateful ParDo", ParDo.of( new DoFn, KV>() { private static final String testState = "testState"; @@ -277,6 +280,7 @@ public void processElement( } })) .apply( + "Second stateful ParDo", ParDo.of( new DoFn, Void>() { private static final String testState = "testState"; @@ -291,9 +295,120 @@ public void processElement( final Map idMap = PViewToIdMapper.buildIdMap(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); + final Map multiParDoStateIdMap = new HashMap<>(); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, multiParDoStateIdMap, configBuilder); + SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( + options, multiParDoStateIdMap, configBuilder); + final Config config = configBuilder.build(); + + assertEquals( + RocksDbKeyValueStorageEngineFactory.class.getName(), + config.get("stores.testState-First_stateful_ParDo.factory")); + assertEquals("byteArraySerde", config.get("stores.testState-First_stateful_ParDo.key.serde")); + assertEquals("stateValueSerde", config.get("stores.testState-First_stateful_ParDo.msg.serde")); + assertNull(config.get("stores.testState-First_stateful_ParDo.changelog")); + + assertEquals( + RocksDbKeyValueStorageEngineFactory.class.getName(), + config.get("stores.testState-Second_stateful_ParDo.factory")); + assertEquals("byteArraySerde", config.get("stores.testState-Second_stateful_ParDo.key.serde")); + assertEquals("stateValueSerde", config.get("stores.testState-Second_stateful_ParDo.msg.serde")); + assertNull(config.get("stores.testState-Second_stateful_ParDo.changelog")); + + options.setStateDurable(true); + Map multiParDoStateIdMap2 = new HashMap<>(); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, multiParDoStateIdMap2, configBuilder); + SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( + options, multiParDoStateIdMap2, configBuilder); + final Config config2 = configBuilder.build(); + assertEquals( + "TestStoreConfig-1-testState-First_stateful_ParDo-changelog", + config2.get("stores.testState-First_stateful_ParDo.changelog")); + assertEquals( + "TestStoreConfig-1-testState-Second_stateful_ParDo-changelog", + config2.get("stores.testState-Second_stateful_ParDo.changelog")); + } + + @Test + public void testUserStoreConfigSameStateIdAndPTransformName() { + SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); + options.setJobName("TestStoreConfig"); + options.setRunner(SamzaRunner.class); - assertThrows( - IllegalStateException.class, - () -> SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder)); + Pipeline pipeline = Pipeline.create(options); + pipeline + .apply( + Create.empty(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))) + .apply( + "Same stateful ParDo Name", + ParDo.of( + new DoFn, KV>() { + private static final String testState = "testState"; + + @StateId(testState) + private final StateSpec> state = StateSpecs.value(); + + @ProcessElement + public void processElement( + ProcessContext context, @StateId(testState) ValueState state) { + context.output(context.element()); + } + })) + .apply( + "Same stateful ParDo Name", + ParDo.of( + new DoFn, Void>() { + private static final String testState = "testState"; + + @StateId(testState) + private final StateSpec> state = StateSpecs.value(); + + @ProcessElement + public void processElement( + ProcessContext context, @StateId(testState) ValueState state) {} + })); + + final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final ConfigBuilder configBuilder = new ConfigBuilder(options); + final Map multiParDoStateIdMap = new HashMap<>(); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, multiParDoStateIdMap, configBuilder); + SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( + options, multiParDoStateIdMap, configBuilder); + final Config config = configBuilder.build(); + + assertEquals( + RocksDbKeyValueStorageEngineFactory.class.getName(), + config.get("stores.testState-Same_stateful_ParDo_Name.factory")); + assertEquals( + "byteArraySerde", config.get("stores.testState-Same_stateful_ParDo_Name.key.serde")); + assertEquals( + "stateValueSerde", config.get("stores.testState-Same_stateful_ParDo_Name.msg.serde")); + assertNull(config.get("stores.testState-Same_stateful_ParDo_Name.changelog")); + + assertEquals( + RocksDbKeyValueStorageEngineFactory.class.getName(), + config.get("stores.testState-Same_stateful_ParDo_Name2.factory")); + assertEquals( + "byteArraySerde", config.get("stores.testState-Same_stateful_ParDo_Name2.key.serde")); + assertEquals( + "stateValueSerde", config.get("stores.testState-Same_stateful_ParDo_Name2.msg.serde")); + assertNull(config.get("stores.testState-Same_stateful_ParDo_Name2.changelog")); + + options.setStateDurable(true); + Map multiParDoStateIdMap2 = new HashMap<>(); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, multiParDoStateIdMap2, configBuilder); + SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( + options, multiParDoStateIdMap2, configBuilder); + final Config config2 = configBuilder.build(); + assertEquals( + "TestStoreConfig-1-testState-Same_stateful_ParDo_Name-changelog", + config2.get("stores.testState-Same_stateful_ParDo_Name.changelog")); + assertEquals( + "TestStoreConfig-1-testState-Same_stateful_ParDo_Name2-changelog", + config2.get("stores.testState-Same_stateful_ParDo_Name2.changelog")); } } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java index 8827f1e5622c0..b8055796ca193 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.samza.translation; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; @@ -24,11 +25,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.runtime.OpMessage; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; @@ -51,6 +54,8 @@ public class TranslationContextTest { MapFunction keyFn = m -> m.toString(); MapFunction valueFn = m -> m; private final String streamName = "testStream"; + private static final String SINGLE_PARDO_STATE_ID = "stateId1"; + private static final String MULTI_PARDO_STATE_ID = "multiParDoStateId1"; KVSerde serde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); StreamApplicationDescriptor streamApplicationDescriptor = new StreamApplicationDescriptorImpl( @@ -60,8 +65,10 @@ public class TranslationContextTest { }, getConfig()); Map idMap = new HashMap<>(); + Set multiParDoStateIds = ImmutableSet.of(MULTI_PARDO_STATE_ID); TranslationContext translationContext = - new TranslationContext(streamApplicationDescriptor, idMap, mock(SamzaPipelineOptions.class)); + new TranslationContext( + streamApplicationDescriptor, idMap, multiParDoStateIds, mock(SamzaPipelineOptions.class)); @Test public void testRegisterInputMessageStreams() { @@ -77,6 +84,16 @@ public void testRegisterInputMessageStreams() { assertNotNull(translationContext.getMessageStream(output)); } + @Test + public void testGetStateIdToStoreIdMap() { + final Set input = ImmutableSet.of(SINGLE_PARDO_STATE_ID, MULTI_PARDO_STATE_ID); + final String escapedParDoName = "mockParDoName"; + Map output = translationContext.getStateIdToStoreIdMap(input, escapedParDoName); + assertEquals(SINGLE_PARDO_STATE_ID, output.get(SINGLE_PARDO_STATE_ID)); + assertEquals( + String.join("-", MULTI_PARDO_STATE_ID, escapedParDoName), output.get(MULTI_PARDO_STATE_ID)); + } + public GenericInputDescriptor>> createSamzaInputDescriptor( String systemName, String streamId) { final Serde>> kvSerde = From d0adf3e3b126612af60ec3d90718487ca3c7ca24 Mon Sep 17 00:00:00 2001 From: Seung Jin An Date: Wed, 30 Nov 2022 16:21:24 -0800 Subject: [PATCH 2/5] Added StoreIdUtils.toMultiParDoStoreId() helper method --- .../beam/runners/samza/SamzaRunner.java | 7 ----- .../runtime/SamzaStoreStateInternals.java | 4 +-- .../samza/translation/ConfigContext.java | 15 ++++++---- .../translation/SamzaPipelineTranslator.java | 29 ++++++++++++------- .../samza/translation/TranslationContext.java | 3 +- .../beam/runners/samza/util/StoreIdUtils.java | 29 +++++++++++++++++++ 6 files changed, 61 insertions(+), 26 deletions(-) create mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 0a014c7c6d871..f8bb22d1b2735 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -141,13 +141,6 @@ public SamzaPipelineResult run(Pipeline pipeline) { LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); - /* Map of stateId to sanitized (remove whitespace and replace '-' with '_') PTransform name, used in multiple ParDos - (eg) @StateId("foo") used in two ParDos: - .apply("First Stateful ParDo with same stateId", ParDo.of(fn)) - .apply("Second Stateful ParDo with same stateId", ParDo.of(fn2)) - Map = ("foo", "First_Stateful_ParDo_with_same_stateId") - This will be populated with the key of stateId and value as the sanitized ParDo name. This map will be used to - identify the stateId used in multiple ParDos and rewrite RocksDB configs. */ final Map multiParDoStateIdMap = new HashMap<>(); final ConfigBuilder configBuilder = new ConfigBuilder(options); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java index 322da4babd33a..2d30841b57fbc 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java @@ -142,8 +142,8 @@ static Factory createStateInternalsFactory( // TODO: handle same stateIds in multiple ParDos for portable mode Map stateIds = executableStage.getUserStates().stream() - .map(UserStateReference::localName) - .collect(Collectors.toMap(Function.identity(), Function.identity())); + .collect( + Collectors.toMap(UserStateReference::localName, UserStateReference::localName)); return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java index dfcdb420df3c1..fc28821544b9f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.samza.SamzaPipelineOptions; +import org.apache.beam.runners.samza.util.StoreIdUtils; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; @@ -35,7 +36,7 @@ public class ConfigContext { private AppliedPTransform currentTransform; private final SamzaPipelineOptions options; private final Map usedStateIdMap; - private final Map multiParDoStateIdMap; + private final Map stateIdsToRewrite; public ConfigContext( Map idMap, @@ -44,7 +45,7 @@ public ConfigContext( this.idMap = idMap; this.options = options; this.usedStateIdMap = new HashMap<>(); - this.multiParDoStateIdMap = multiParDoStateIdMap; + this.stateIdsToRewrite = multiParDoStateIdMap; } public void setCurrentTransform(AppliedPTransform currentTransform) { @@ -76,15 +77,17 @@ public String getUniqueStoreId(String stateId, String parDoName) { return stateId; } else { // Same state id identified for the first time - if (!multiParDoStateIdMap.containsKey(stateId)) { + if (!stateIdsToRewrite.containsKey(stateId)) { final String prevParDoName = usedStateIdMap.get(stateId); - final String prevMultiParDoStateId = String.join("-", stateId, prevParDoName); + final String prevMultiParDoStateId = + StoreIdUtils.toMultiParDoStoreId(stateId, prevParDoName); usedStateIdMap.put(prevMultiParDoStateId, prevParDoName); // Store the stateId with previous parDo name which will be used for config rewriting - multiParDoStateIdMap.put(stateId, prevParDoName); + stateIdsToRewrite.put(stateId, prevParDoName); } // Compose a new store id with state id and parDo name (eg) "stateId-parDoName" - final String multiParDoStateId = String.join("-", stateId, parDoName); + final String multiParDoStateId = StoreIdUtils.toMultiParDoStoreId(stateId, parDoName); + ; // Leveraging framework which enforces unique parDo name. // If the framework logic changes, this is a safeguard to throw exception to avoid storeId // collision diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java index dd3eb93487345..76070713fe37d 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.samza.SamzaPipelineOptions; +import org.apache.beam.runners.samza.util.StoreIdUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Combine; @@ -106,16 +107,24 @@ public static void createConfig( pipeline.traverseTopologically(visitor); } - /** Rewrite user store configs if there exists same state ids with multiple ParDos. */ + /** + * Rewrite user store configs if there exists same state ids used in multiple ParDos. For each + * entry of a stateId to escaped PTransform name of first occurrence in topological traversal, + * rewrite RocksDB configs with the new mapping enforced from stateId to storeId. + * (eg) @StateId("foo") used in two ParDos fn, fn2: .apply("First Stateful ParDo with same + * stateId", ParDo.of(fn)) .apply("Second Stateful ParDo with same stateId", ParDo.of(fn2)) Map = + * ("foo", "First_Stateful_ParDo_with_same_stateId") storeId = + * "foo-First_Stateful_ParDo_with_same_stateId" + */ public static void rewriteConfigWithMultiParDoStateId( SamzaPipelineOptions options, Map multiParDoStateIdMap, ConfigBuilder configBuilder) { multiParDoStateIdMap.forEach( (stateId, value) -> { - // rewrite single parDo state configs into multiple parDo state - String multiParDoStateId = String.join("-", stateId, value); - // replace old single parDo store configs with new storeId mapping appended with parDo + // rewrite single ParDo store configs with multiple ParDo storeId + String multiParDoStoreId = StoreIdUtils.toMultiParDoStoreId(stateId, value); + // replace old single ParDo store configs with new storeId mapping appended with parDo // name configBuilder.remove("stores." + stateId + ".factory"); configBuilder.remove("stores." + stateId + ".key.serde"); @@ -123,17 +132,17 @@ public static void rewriteConfigWithMultiParDoStateId( configBuilder.remove("stores." + stateId + ".rocksdb.compression"); // put new config with multi pardo config configBuilder.put( - "stores." + multiParDoStateId + ".factory", + "stores." + multiParDoStoreId + ".factory", "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"); - configBuilder.put("stores." + multiParDoStateId + ".key.serde", "byteArraySerde"); - configBuilder.put("stores." + multiParDoStateId + ".msg.serde", "stateValueSerde"); - configBuilder.put("stores." + multiParDoStateId + ".rocksdb.compression", "lz4"); + configBuilder.put("stores." + multiParDoStoreId + ".key.serde", "byteArraySerde"); + configBuilder.put("stores." + multiParDoStoreId + ".msg.serde", "stateValueSerde"); + configBuilder.put("stores." + multiParDoStoreId + ".rocksdb.compression", "lz4"); if (options.getStateDurable()) { configBuilder.remove("stores." + stateId + ".changelog"); configBuilder.put( - "stores." + multiParDoStateId + ".changelog", - ConfigBuilder.getChangelogTopic(options, multiParDoStateId)); + "stores." + multiParDoStoreId + ".changelog", + ConfigBuilder.getChangelogTopic(options, multiParDoStoreId)); } }); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java index da7fbdc0ec4e4..75a160a41251e 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.runtime.OpMessage; import org.apache.beam.runners.samza.util.HashIdGenerator; +import org.apache.beam.runners.samza.util.StoreIdUtils; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -260,7 +261,7 @@ public Map getStateIdToStoreIdMap(Set stateIds, String e storeIds.put( stateId, multiParDoStateIds.contains(stateId) - ? String.join("-", stateId, escapedParDoName) + ? StoreIdUtils.toMultiParDoStoreId(stateId, escapedParDoName) : stateId)); return storeIds; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java new file mode 100644 index 0000000000000..b1e09ad4d809a --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.util; + +public class StoreIdUtils { + + /** + * Join stateId and escaped PTransform name, used for RocksDB storeId of stateIds with multiple + * ParDos. + */ + public static String toMultiParDoStoreId(String stateId, String escapedPTransformName) { + return String.join("-", stateId, escapedPTransformName); + } +} From 76e8fe6cd180060200dc6ffbd5684797088b7f26 Mon Sep 17 00:00:00 2001 From: Seung Jin An Date: Fri, 2 Dec 2022 00:48:41 -0800 Subject: [PATCH 3/5] * Keep stateId to store mapping in SamzaStoreStateInternals * Perform prescan of a pipeline to identify non-unique state ids --- .../beam/runners/samza/SamzaRunner.java | 11 ++- .../beam/runners/samza/runtime/DoFnOp.java | 8 +-- .../runners/samza/runtime/GroupByKeyOp.java | 1 - .../samza/runtime/SamzaDoFnRunners.java | 8 ++- .../runtime/SamzaStoreStateInternals.java | 56 ++++++++------- ...SplittableParDoProcessKeyedElementsOp.java | 1 - .../samza/translation/ConfigBuilder.java | 9 +-- .../samza/translation/ConfigContext.java | 48 ++----------- .../ParDoBoundMultiTranslator.java | 47 +++++-------- .../PortableTranslationContext.java | 2 +- .../translation/SamzaPipelineTranslator.java | 46 +------------ .../samza/translation/StateIdParser.java | 69 +++++++++++++++++++ .../samza/translation/TranslationContext.java | 17 ----- .../beam/runners/samza/util/StoreIdUtils.java | 7 +- .../translation/ConfigGeneratorTest.java | 57 +++++++-------- .../translation/TranslationContextTest.java | 19 +---- 16 files changed, 173 insertions(+), 233 deletions(-) create mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/translation/StateIdParser.java diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index f8bb22d1b2735..d354e75c4b823 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.samza.translation.SamzaPipelineTranslator; import org.apache.beam.runners.samza.translation.SamzaPortablePipelineTranslator; import org.apache.beam.runners.samza.translation.SamzaTransformOverrides; +import org.apache.beam.runners.samza.translation.StateIdParser; import org.apache.beam.runners.samza.translation.TranslationContext; import org.apache.beam.runners.samza.util.PipelineJsonRenderer; import org.apache.beam.sdk.Pipeline; @@ -141,14 +142,11 @@ public SamzaPipelineResult run(Pipeline pipeline) { LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); - final Map multiParDoStateIdMap = new HashMap<>(); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig( - pipeline, options, idMap, multiParDoStateIdMap, configBuilder); - SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( - options, multiParDoStateIdMap, configBuilder); - + pipeline, options, idMap, nonUniqueStateIds, configBuilder); configBuilder.put(BEAM_DOT_GRAPH, dotGraph); configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); @@ -161,7 +159,6 @@ public SamzaPipelineResult run(Pipeline pipeline) { final SamzaExecutionContext executionContext = new SamzaExecutionContext(options); final Map reporterFactories = getMetricsReporters(); - final Set multiParDoStateIds = multiParDoStateIdMap.keySet(); final StreamApplication app = appDescriptor -> { @@ -169,7 +166,7 @@ public SamzaPipelineResult run(Pipeline pipeline) { appDescriptor.withMetricsReporterFactories(reporterFactories); SamzaPipelineTranslator.translate( - pipeline, new TranslationContext(appDescriptor, idMap, multiParDoStateIds, options)); + pipeline, new TranslationContext(appDescriptor, idMap, options)); }; // perform a final round of validation for the pipeline options now that all configs are diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index c1dc4972e7203..735ec62cd3259 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -127,7 +127,6 @@ public class DoFnOp implements Op { private final DoFnSchemaInformation doFnSchemaInformation; private final Map> sideInputMapping; - private final Map userStateIds; public DoFnOp( TupleTag mainOutputTag, @@ -149,8 +148,7 @@ public DoFnOp( JobInfo jobInfo, Map> idToTupleTagMap, DoFnSchemaInformation doFnSchemaInformation, - Map> sideInputMapping, - Map userStateIds) { + Map> sideInputMapping) { this.mainOutputTag = mainOutputTag; this.doFn = doFn; this.sideInputs = sideInputs; @@ -173,7 +171,6 @@ public DoFnOp( this.bundleStateId = "_samza_bundle_" + transformId; this.doFnSchemaInformation = doFnSchemaInformation; this.sideInputMapping = sideInputMapping; - this.userStateIds = userStateIds; } @Override @@ -264,8 +261,7 @@ public void open( doFnSchemaInformation, (Map>) sideInputMapping, emitter, - outputFutureCollector, - userStateIds); + outputFutureCollector); } this.pushbackFnRunner = diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java index 4c2c5ce2cd0ae..3ecd406da615f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java @@ -126,7 +126,6 @@ public void open( Collections.singletonMap( SamzaStoreStateInternals.BEAM_STORE, SamzaStoreStateInternals.getBeamStore(context.getTaskContext())), - Collections.emptyMap(), keyCoder, pipelineOptions.getStoreBatchGetSize()); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index 4db0a3eebec0e..ec1a9f3650908 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -52,6 +52,8 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -90,14 +92,14 @@ public static DoFnRunner create( DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping, OpEmitter emitter, - FutureCollector futureCollector, - Map userStateIds) { + FutureCollector futureCollector) { final KeyedInternals keyedInternals; final TimerInternals timerInternals; final StateInternals stateInternals; + final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); final SamzaStoreStateInternals.Factory stateInternalsFactory = SamzaStoreStateInternals.createStateInternalsFactory( - transformId, keyCoder, context.getTaskContext(), pipelineOptions, userStateIds); + transformId, keyCoder, context.getTaskContext(), pipelineOptions, signature); final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getApplicationContainerContext(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java index 2d30841b57fbc..7df48fb4cad5e 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java @@ -26,12 +26,14 @@ import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -64,6 +66,7 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -96,8 +99,6 @@ public class SamzaStoreStateInternals implements StateInternals { // the stores include both beamStore for system states as well as stores for user state private final Map>> stores; - // Map of non-system stateIds to unique storeId - private final Map stateIdMap; private final K key; private final byte[] keyBytes; private final int batchGetSize; @@ -105,7 +106,6 @@ public class SamzaStoreStateInternals implements StateInternals { private SamzaStoreStateInternals( Map>> stores, - Map stateIds, @Nullable K key, byte @Nullable [] keyBytes, String stageId, @@ -115,7 +115,6 @@ private SamzaStoreStateInternals( this.keyBytes = keyBytes; this.batchGetSize = batchGetSize; this.stageId = stageId; - this.stateIdMap = stateIds; } @SuppressWarnings("unchecked") @@ -130,7 +129,18 @@ static KeyValueStore> getBeamStore(TaskContext context) */ static Factory createNonKeyedStateInternalsFactory( String id, TaskContext context, SamzaPipelineOptions pipelineOptions) { - return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptyMap()); + return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptySet()); + } + + static Factory createStateInternalsFactory( + String id, + Coder keyCoder, + TaskContext context, + SamzaPipelineOptions pipelineOptions, + DoFnSignature signature) { + + return createStateInternalsFactory( + id, keyCoder, context, pipelineOptions, signature.stateDeclarations().keySet()); } static Factory createStateInternalsFactory( @@ -139,39 +149,37 @@ static Factory createStateInternalsFactory( TaskContext context, SamzaPipelineOptions pipelineOptions, ExecutableStage executableStage) { - // TODO: handle same stateIds in multiple ParDos for portable mode - Map stateIds = + + Set stateIds = executableStage.getUserStates().stream() - .collect( - Collectors.toMap(UserStateReference::localName, UserStateReference::localName)); + .map(UserStateReference::localName) + .collect(Collectors.toSet()); return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds); } @SuppressWarnings("unchecked") - static Factory createStateInternalsFactory( + private static Factory createStateInternalsFactory( String id, @Nullable Coder keyCoder, TaskContext context, SamzaPipelineOptions pipelineOptions, - Map stateIdMap) { + Collection stateIds) { final int batchGetSize = pipelineOptions.getStoreBatchGetSize(); final Map>> stores = new HashMap<>(); stores.put(BEAM_STORE, getBeamStore(context)); + final Coder stateKeyCoder; if (keyCoder != null) { - stateIdMap - .values() - .forEach( - storeId -> - stores.put( - storeId, - (KeyValueStore>) context.getStore(storeId))); + stateIds.forEach( + stateId -> + stores.put( + stateId, (KeyValueStore>) context.getStore(stateId))); stateKeyCoder = keyCoder; } else { stateKeyCoder = (Coder) VoidCoder.of(); } - return new Factory<>(Objects.toString(id), stores, stateIdMap, stateKeyCoder, batchGetSize); + return new Factory<>(Objects.toString(id), stores, stateKeyCoder, batchGetSize); } @Override @@ -263,19 +271,16 @@ private static ByteArrayOutputStream getThreadLocalBaos() { public static class Factory implements StateInternalsFactory { private final String stageId; private final Map>> stores; - private final Map stateIdMap; private final Coder keyCoder; private final int batchGetSize; public Factory( String stageId, Map>> stores, - Map stateIdMap, Coder keyCoder, int batchGetSize) { this.stageId = stageId; this.stores = stores; - this.stateIdMap = stateIdMap; this.keyCoder = keyCoder; this.batchGetSize = batchGetSize; } @@ -298,8 +303,7 @@ public StateInternals stateInternalsForKey(@Nullable K key) { throw new RuntimeException("Cannot encode key for state store", e); } - return new SamzaStoreStateInternals<>( - stores, stateIdMap, key, baos.toByteArray(), stageId, batchGetSize); + return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(), stageId, batchGetSize); } } @@ -319,11 +323,11 @@ protected AbstractSamzaState( this.coder = coder; this.namespace = namespace; this.addressId = address.getId(); - this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId())); + this.isBeamStore = !stores.containsKey(address.getId()); this.store = isBeamStore ? (KeyValueStore) stores.get(BEAM_STORE) - : (KeyValueStore) stores.get(stateIdMap.get(address.getId())); + : (KeyValueStore) stores.get(address.getId()); this.stageId = SamzaStoreStateInternals.this.stageId; this.keyBytes = SamzaStoreStateInternals.this.keyBytes; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java index c25c205dae2a6..cf164b145b630 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java @@ -124,7 +124,6 @@ public void open( Collections.singletonMap( SamzaStoreStateInternals.BEAM_STORE, SamzaStoreStateInternals.getBeamStore(context.getTaskContext())), - Collections.emptyMap(), ByteArrayCoder.of(), pipelineOptions.getStoreBatchGetSize()); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java index d06fbd6883d2a..c648957d1eb76 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java @@ -52,6 +52,7 @@ import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.runtime.RemoteApplicationRunner; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory; import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,10 +84,6 @@ public void putAll(Map properties) { config.putAll(properties); } - public void remove(String name) { - config.remove(name); - } - /** @return built configuration */ public Config build() { try { @@ -310,9 +307,7 @@ private static Map createSystemConfig( static Map createRocksDBStoreConfig(SamzaPipelineOptions options) { final ImmutableMap.Builder configBuilder = ImmutableMap.builder() - .put( - BEAM_STORE_FACTORY, - "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory") + .put(BEAM_STORE_FACTORY, RocksDbKeyValueStorageEngineFactory.class.getName()) .put("stores.beamStore.rocksdb.compression", "lz4"); if (options.getStateDurable()) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java index fc28821544b9f..be653991e6d31 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java @@ -17,10 +17,9 @@ */ package org.apache.beam.runners.samza.translation; -import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.apache.beam.runners.samza.SamzaPipelineOptions; -import org.apache.beam.runners.samza.util.StoreIdUtils; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; @@ -35,17 +34,13 @@ public class ConfigContext { private final Map idMap; private AppliedPTransform currentTransform; private final SamzaPipelineOptions options; - private final Map usedStateIdMap; - private final Map stateIdsToRewrite; + private final Set nonUniqueStateIds; public ConfigContext( - Map idMap, - SamzaPipelineOptions options, - Map multiParDoStateIdMap) { + Map idMap, Set nonUniqueStateIds, SamzaPipelineOptions options) { this.idMap = idMap; + this.nonUniqueStateIds = nonUniqueStateIds; this.options = options; - this.usedStateIdMap = new HashMap<>(); - this.stateIdsToRewrite = multiParDoStateIdMap; } public void setCurrentTransform(AppliedPTransform currentTransform) { @@ -69,39 +64,8 @@ public SamzaPipelineOptions getPipelineOptions() { return this.options; } - /** Helper to keep track of used stateIds and return unique store id. */ - public String getUniqueStoreId(String stateId, String parDoName) { - // Update a map of used state id with parDo name. - if (!usedStateIdMap.containsKey(stateId)) { - usedStateIdMap.put(stateId, parDoName); - return stateId; - } else { - // Same state id identified for the first time - if (!stateIdsToRewrite.containsKey(stateId)) { - final String prevParDoName = usedStateIdMap.get(stateId); - final String prevMultiParDoStateId = - StoreIdUtils.toMultiParDoStoreId(stateId, prevParDoName); - usedStateIdMap.put(prevMultiParDoStateId, prevParDoName); - // Store the stateId with previous parDo name which will be used for config rewriting - stateIdsToRewrite.put(stateId, prevParDoName); - } - // Compose a new store id with state id and parDo name (eg) "stateId-parDoName" - final String multiParDoStateId = StoreIdUtils.toMultiParDoStoreId(stateId, parDoName); - ; - // Leveraging framework which enforces unique parDo name. - // If the framework logic changes, this is a safeguard to throw exception to avoid storeId - // collision - if (usedStateIdMap.containsKey(multiParDoStateId)) { - throw new IllegalStateException( - "Same stateId " - + stateId - + " with the same parDoName " - + parDoName - + " found in multiple ParDo."); - } - usedStateIdMap.put(multiParDoStateId, parDoName); - return multiParDoStateId; - } + public boolean isUniqueStateId(String stateId) { + return !nonUniqueStateIds.contains(stateId); } private String getIdForPValue(PValue pvalue) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java index d5bc828f50f68..e7079f3cbe8bf 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -46,6 +45,7 @@ import org.apache.beam.runners.samza.runtime.SamzaDoFnInvokerRegistrar; import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils; import org.apache.beam.runners.samza.util.StateUtils; +import org.apache.beam.runners.samza.util.StoreIdUtils; import org.apache.beam.runners.samza.util.WindowUtils; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -73,6 +73,7 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory; import org.joda.time.Instant; /** @@ -162,16 +163,6 @@ private static void doTranslate( Map> sideInputMapping = ParDoTranslation.getSideInputMapping(ctx.getCurrentTransform()); - final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - final Map userStateIdToStoreIdMap; - if (DoFnSignatures.isStateful(transform.getFn())) { - final Set stateIds = signature.stateDeclarations().keySet(); - final String escapedParDoName = - SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName()); - userStateIdToStoreIdMap = ctx.getStateIdToStoreIdMap(stateIds, escapedParDoName); - } else { - userStateIdToStoreIdMap = Collections.emptyMap(); - } final DoFnOp op = new DoFnOp<>( transform.getMainOutputTag(), @@ -193,8 +184,7 @@ private static void doTranslate( null, Collections.emptyMap(), doFnSchemaInformation, - sideInputMapping, - userStateIdToStoreIdMap); + sideInputMapping); final MessageStream> mergedStreams; if (sideInputStreams.isEmpty()) { @@ -345,9 +335,7 @@ private static void doTranslatePortable( ctx.getJobInfo(), idToTupleTagMap, doFnSchemaInformation, - sideInputMapping, - // TODO: populate this for portable - Collections.emptyMap()); + sideInputMapping); final MessageStream> mergedStreams; if (sideInputStreams.isEmpty()) { @@ -391,21 +379,23 @@ public Map createConfig( if (signature.usesState()) { // set up user state configs for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) { - final String userStateId = state.id(); - final String escapedParDoName = - SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName()); - final String uniqueStoreId = ctx.getUniqueStoreId(userStateId, escapedParDoName); + final String stateId = state.id(); + String storeId = stateId; + if (!ctx.isUniqueStateId(stateId)) { + final String escapedName = + SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName()); + storeId = StoreIdUtils.toUniqueStoreId(stateId, escapedName); + } config.put( - "stores." + uniqueStoreId + ".factory", - "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"); - config.put("stores." + uniqueStoreId + ".key.serde", "byteArraySerde"); - config.put("stores." + uniqueStoreId + ".msg.serde", "stateValueSerde"); - config.put("stores." + uniqueStoreId + ".rocksdb.compression", "lz4"); + "stores." + storeId + ".factory", RocksDbKeyValueStorageEngineFactory.class.getName()); + config.put("stores." + storeId + ".key.serde", "byteArraySerde"); + config.put("stores." + storeId + ".msg.serde", "stateValueSerde"); + config.put("stores." + storeId + ".rocksdb.compression", "lz4"); if (options.getStateDurable()) { config.put( - "stores." + uniqueStoreId + ".changelog", - ConfigBuilder.getChangelogTopic(options, uniqueStoreId)); + "stores." + storeId + ".changelog", + ConfigBuilder.getChangelogTopic(options, storeId)); } } } @@ -440,8 +430,7 @@ public Map createPortableConfig( final String storeId = stateId.getLocalName(); config.put( - "stores." + storeId + ".factory", - "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"); + "stores." + storeId + ".factory", RocksDbKeyValueStorageEngineFactory.class.getName()); config.put("stores." + storeId + ".key.serde", "byteArraySerde"); config.put("stores." + storeId + ".msg.serde", "stateValueSerde"); config.put("stores." + storeId + ".rocksdb.compression", "lz4"); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java index 2195489ca13fb..776ee80878d1e 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java @@ -50,7 +50,7 @@ public class PortableTranslationContext extends TranslationContext { public PortableTranslationContext( StreamApplicationDescriptor appDescriptor, SamzaPipelineOptions options, JobInfo jobInfo) { - super(appDescriptor, Collections.emptyMap(), Collections.emptySet(), options); + super(appDescriptor, Collections.emptyMap(), options); this.jobInfo = jobInfo; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java index 76070713fe37d..75409b07201da 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java @@ -23,11 +23,11 @@ import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.samza.SamzaPipelineOptions; -import org.apache.beam.runners.samza.util.StoreIdUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Combine; @@ -79,9 +79,9 @@ public static void createConfig( Pipeline pipeline, SamzaPipelineOptions options, Map idMap, - Map multiParDoStateIdMap, + Set nonUniqueStateIds, ConfigBuilder configBuilder) { - final ConfigContext ctx = new ConfigContext(idMap, options, multiParDoStateIdMap); + final ConfigContext ctx = new ConfigContext(idMap, nonUniqueStateIds, options); final TransformVisitorFn configFn = new TransformVisitorFn() { @@ -107,46 +107,6 @@ public static void createConfig( pipeline.traverseTopologically(visitor); } - /** - * Rewrite user store configs if there exists same state ids used in multiple ParDos. For each - * entry of a stateId to escaped PTransform name of first occurrence in topological traversal, - * rewrite RocksDB configs with the new mapping enforced from stateId to storeId. - * (eg) @StateId("foo") used in two ParDos fn, fn2: .apply("First Stateful ParDo with same - * stateId", ParDo.of(fn)) .apply("Second Stateful ParDo with same stateId", ParDo.of(fn2)) Map = - * ("foo", "First_Stateful_ParDo_with_same_stateId") storeId = - * "foo-First_Stateful_ParDo_with_same_stateId" - */ - public static void rewriteConfigWithMultiParDoStateId( - SamzaPipelineOptions options, - Map multiParDoStateIdMap, - ConfigBuilder configBuilder) { - multiParDoStateIdMap.forEach( - (stateId, value) -> { - // rewrite single ParDo store configs with multiple ParDo storeId - String multiParDoStoreId = StoreIdUtils.toMultiParDoStoreId(stateId, value); - // replace old single ParDo store configs with new storeId mapping appended with parDo - // name - configBuilder.remove("stores." + stateId + ".factory"); - configBuilder.remove("stores." + stateId + ".key.serde"); - configBuilder.remove("stores." + stateId + ".msg.serde"); - configBuilder.remove("stores." + stateId + ".rocksdb.compression"); - // put new config with multi pardo config - configBuilder.put( - "stores." + multiParDoStoreId + ".factory", - "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"); - configBuilder.put("stores." + multiParDoStoreId + ".key.serde", "byteArraySerde"); - configBuilder.put("stores." + multiParDoStoreId + ".msg.serde", "stateValueSerde"); - configBuilder.put("stores." + multiParDoStoreId + ".rocksdb.compression", "lz4"); - - if (options.getStateDurable()) { - configBuilder.remove("stores." + stateId + ".changelog"); - configBuilder.put( - "stores." + multiParDoStoreId + ".changelog", - ConfigBuilder.getChangelogTopic(options, multiParDoStoreId)); - } - }); - } - private interface TransformVisitorFn { > void apply( T transform, diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/StateIdParser.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/StateIdParser.java new file mode 100644 index 0000000000000..05135a4d97dd6 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/StateIdParser.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.translation; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.runners.samza.util.StateUtils; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; + +/** + * This class identifies the set of non-unique state ids by scanning the BEAM {@link Pipeline} with + * a topological traversal. + */ +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class StateIdParser extends Pipeline.PipelineVisitor.Defaults { + private final Set nonUniqueStateIds = new HashSet<>(); + private final Set usedStateIds = new HashSet<>(); + + public static Set scan(Pipeline pipeline) { + final StateIdParser parser = new StateIdParser(); + pipeline.traverseTopologically(parser); + return parser.getNonUniqueStateIds(); + } + + private StateIdParser() {} + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + if (node.getTransform() instanceof ParDo.MultiOutput) { + final DoFn doFn = ((ParDo.MultiOutput) node.getTransform()).getFn(); + if (StateUtils.isStateful(doFn)) { + final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + for (String stateId : signature.stateDeclarations().keySet()) { + if (!usedStateIds.add(stateId)) { + nonUniqueStateIds.add(stateId); + } + } + } + } + } + + public Set getNonUniqueStateIds() { + return Collections.unmodifiableSet(nonUniqueStateIds); + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java index 75a160a41251e..8488594504922 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java @@ -30,7 +30,6 @@ import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.runtime.OpMessage; import org.apache.beam.runners.samza.util.HashIdGenerator; -import org.apache.beam.runners.samza.util.StoreIdUtils; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -84,7 +83,6 @@ public class TranslationContext { private final Map idMap; private final Map registeredInputStreams = new HashMap<>(); private final Map registeredTables = new HashMap<>(); - private final Set multiParDoStateIds; private final SamzaPipelineOptions options; private final HashIdGenerator idGenerator = new HashIdGenerator(); @@ -93,11 +91,9 @@ public class TranslationContext { public TranslationContext( StreamApplicationDescriptor appDescriptor, Map idMap, - Set multiParDoStateIds, SamzaPipelineOptions options) { this.appDescriptor = appDescriptor; this.idMap = idMap; - this.multiParDoStateIds = multiParDoStateIds; this.options = options; } @@ -253,19 +249,6 @@ public String getTransformId() { return idGenerator.getId(getTransformFullName()); } - /** Given a set of user stateIds and parDo name, return a stateId to storeId map. */ - public Map getStateIdToStoreIdMap(Set stateIds, String escapedParDoName) { - final Map storeIds = new HashMap<>(); - stateIds.forEach( - stateId -> - storeIds.put( - stateId, - multiParDoStateIds.contains(stateId) - ? StoreIdUtils.toMultiParDoStoreId(stateId, escapedParDoName) - : stateId)); - return storeIds; - } - /** The dummy stream created will only be used in Beam tests. */ private static InputDescriptor, ?> createDummyStreamDescriptor(String id) { final GenericSystemDescriptor dummySystem = diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java index b1e09ad4d809a..d2723509579a8 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java @@ -19,11 +19,8 @@ public class StoreIdUtils { - /** - * Join stateId and escaped PTransform name, used for RocksDB storeId of stateIds with multiple - * ParDos. - */ - public static String toMultiParDoStoreId(String stateId, String escapedPTransformName) { + /** Join stateId and escaped PTransform name to uniquify storeId. */ + public static String toUniqueStoreId(String stateId, String escapedPTransformName) { return String.join("-", stateId, escapedPTransformName); } } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java index 801e84c825a1c..9c6c00e86b88d 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java @@ -21,9 +21,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.beam.runners.samza.SamzaExecutionEnvironment; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.SamzaRunner; @@ -74,10 +74,10 @@ public void testStatefulBeamStoreConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); - final Map multiParDoStateIdMap = new HashMap<>(); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig( - pipeline, options, idMap, multiParDoStateIdMap, configBuilder); + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -88,7 +88,8 @@ public void testStatefulBeamStoreConfig() { assertNull(config.get("stores.beamStore.changelog")); options.setStateDurable(true); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-beamStore-changelog", config2.get("stores.beamStore.changelog")); @@ -106,8 +107,10 @@ public void testStatelessBeamStoreConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -118,7 +121,8 @@ public void testStatelessBeamStoreConfig() { assertNull(config.get("stores.beamStore.changelog")); options.setStateDurable(true); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config2 = configBuilder.build(); // For stateless jobs, ignore state durable pipeline option. assertNull(config2.get("stores.beamStore.changelog")); @@ -137,8 +141,10 @@ public void testSamzaLocalExecutionEnvironmentConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertTrue( @@ -165,8 +171,10 @@ public void testSamzaYarnExecutionEnvironmentConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); try { Config config = configBuilder.build(); assertEquals(config.get(APP_RUNNER_CLASS), RemoteApplicationRunner.class.getName()); @@ -195,8 +203,10 @@ public void testSamzaStandAloneExecutionEnvironmentConfig() { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); try { Config config = configBuilder.build(); assertEquals(config.get(APP_RUNNER_CLASS), LocalApplicationRunner.class.getName()); @@ -236,8 +246,10 @@ public void processElement( })); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -248,7 +260,8 @@ public void processElement( assertNull(config.get("stores.testState.changelog")); options.setStateDurable(true); - SamzaPipelineTranslator.createConfig(pipeline, options, idMap, new HashMap<>(), configBuilder); + SamzaPipelineTranslator.createConfig( + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-testState-changelog", config2.get("stores.testState.changelog")); @@ -294,12 +307,10 @@ public void processElement( })); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - final Map multiParDoStateIdMap = new HashMap<>(); SamzaPipelineTranslator.createConfig( - pipeline, options, idMap, multiParDoStateIdMap, configBuilder); - SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( - options, multiParDoStateIdMap, configBuilder); + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -317,11 +328,8 @@ public void processElement( assertNull(config.get("stores.testState-Second_stateful_ParDo.changelog")); options.setStateDurable(true); - Map multiParDoStateIdMap2 = new HashMap<>(); SamzaPipelineTranslator.createConfig( - pipeline, options, idMap, multiParDoStateIdMap2, configBuilder); - SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( - options, multiParDoStateIdMap2, configBuilder); + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-testState-First_stateful_ParDo-changelog", @@ -371,12 +379,10 @@ public void processElement( })); final Map idMap = PViewToIdMapper.buildIdMap(pipeline); + final Set nonUniqueStateIds = StateIdParser.scan(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); - final Map multiParDoStateIdMap = new HashMap<>(); SamzaPipelineTranslator.createConfig( - pipeline, options, idMap, multiParDoStateIdMap, configBuilder); - SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( - options, multiParDoStateIdMap, configBuilder); + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config = configBuilder.build(); assertEquals( @@ -398,11 +404,8 @@ public void processElement( assertNull(config.get("stores.testState-Same_stateful_ParDo_Name2.changelog")); options.setStateDurable(true); - Map multiParDoStateIdMap2 = new HashMap<>(); SamzaPipelineTranslator.createConfig( - pipeline, options, idMap, multiParDoStateIdMap2, configBuilder); - SamzaPipelineTranslator.rewriteConfigWithMultiParDoStateId( - options, multiParDoStateIdMap2, configBuilder); + pipeline, options, idMap, nonUniqueStateIds, configBuilder); final Config config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-testState-Same_stateful_ParDo_Name-changelog", diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java index b8055796ca193..8827f1e5622c0 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.samza.translation; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; @@ -25,13 +24,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.runtime.OpMessage; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; @@ -54,8 +51,6 @@ public class TranslationContextTest { MapFunction keyFn = m -> m.toString(); MapFunction valueFn = m -> m; private final String streamName = "testStream"; - private static final String SINGLE_PARDO_STATE_ID = "stateId1"; - private static final String MULTI_PARDO_STATE_ID = "multiParDoStateId1"; KVSerde serde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); StreamApplicationDescriptor streamApplicationDescriptor = new StreamApplicationDescriptorImpl( @@ -65,10 +60,8 @@ public class TranslationContextTest { }, getConfig()); Map idMap = new HashMap<>(); - Set multiParDoStateIds = ImmutableSet.of(MULTI_PARDO_STATE_ID); TranslationContext translationContext = - new TranslationContext( - streamApplicationDescriptor, idMap, multiParDoStateIds, mock(SamzaPipelineOptions.class)); + new TranslationContext(streamApplicationDescriptor, idMap, mock(SamzaPipelineOptions.class)); @Test public void testRegisterInputMessageStreams() { @@ -84,16 +77,6 @@ public void testRegisterInputMessageStreams() { assertNotNull(translationContext.getMessageStream(output)); } - @Test - public void testGetStateIdToStoreIdMap() { - final Set input = ImmutableSet.of(SINGLE_PARDO_STATE_ID, MULTI_PARDO_STATE_ID); - final String escapedParDoName = "mockParDoName"; - Map output = translationContext.getStateIdToStoreIdMap(input, escapedParDoName); - assertEquals(SINGLE_PARDO_STATE_ID, output.get(SINGLE_PARDO_STATE_ID)); - assertEquals( - String.join("-", MULTI_PARDO_STATE_ID, escapedParDoName), output.get(MULTI_PARDO_STATE_ID)); - } - public GenericInputDescriptor>> createSamzaInputDescriptor( String systemName, String streamId) { final Serde>> kvSerde = From 87703c990fbf36ced3c273d1a1e143c68c2b2ca5 Mon Sep 17 00:00:00 2001 From: Seung Jin An Date: Fri, 2 Dec 2022 13:20:34 -0800 Subject: [PATCH 4/5] Keep state id to store mapping --- .../beam/runners/samza/SamzaRunner.java | 2 +- .../beam/runners/samza/runtime/DoFnOp.java | 6 ++- .../samza/runtime/SamzaDoFnRunners.java | 10 +++-- .../runtime/SamzaStoreStateInternals.java | 40 +++++++------------ .../ParDoBoundMultiTranslator.java | 17 +++++++- .../PortableTranslationContext.java | 2 +- .../samza/translation/TranslationContext.java | 7 ++++ .../translation/TranslationContextTest.java | 6 ++- 8 files changed, 55 insertions(+), 35 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index d354e75c4b823..cb8df86058d4b 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -166,7 +166,7 @@ public SamzaPipelineResult run(Pipeline pipeline) { appDescriptor.withMetricsReporterFactories(reporterFactories); SamzaPipelineTranslator.translate( - pipeline, new TranslationContext(appDescriptor, idMap, options)); + pipeline, new TranslationContext(appDescriptor, idMap, nonUniqueStateIds, options)); }; // perform a final round of validation for the pipeline options now that all configs are diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index 735ec62cd3259..35661ae86fe19 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -127,6 +127,7 @@ public class DoFnOp implements Op { private final DoFnSchemaInformation doFnSchemaInformation; private final Map> sideInputMapping; + private final Map stateIdToStoreMapping; public DoFnOp( TupleTag mainOutputTag, @@ -148,7 +149,8 @@ public DoFnOp( JobInfo jobInfo, Map> idToTupleTagMap, DoFnSchemaInformation doFnSchemaInformation, - Map> sideInputMapping) { + Map> sideInputMapping, + Map stateIdToStoreMapping) { this.mainOutputTag = mainOutputTag; this.doFn = doFn; this.sideInputs = sideInputs; @@ -171,6 +173,7 @@ public DoFnOp( this.bundleStateId = "_samza_bundle_" + transformId; this.doFnSchemaInformation = doFnSchemaInformation; this.sideInputMapping = sideInputMapping; + this.stateIdToStoreMapping = stateIdToStoreMapping; } @Override @@ -260,6 +263,7 @@ public void open( outputCoders, doFnSchemaInformation, (Map>) sideInputMapping, + stateIdToStoreMapping, emitter, outputFutureCollector); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index ec1a9f3650908..41fe8190dec1a 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -52,8 +52,6 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -91,15 +89,19 @@ public static DoFnRunner create( Map, Coder> outputCoders, DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping, + Map stateIdToStoreIdMapping, OpEmitter emitter, FutureCollector futureCollector) { final KeyedInternals keyedInternals; final TimerInternals timerInternals; final StateInternals stateInternals; - final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); final SamzaStoreStateInternals.Factory stateInternalsFactory = SamzaStoreStateInternals.createStateInternalsFactory( - transformId, keyCoder, context.getTaskContext(), pipelineOptions, signature); + transformId, + keyCoder, + context.getTaskContext(), + pipelineOptions, + stateIdToStoreIdMapping); final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getApplicationContainerContext(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java index 7df48fb4cad5e..faa2d4addb25f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java @@ -26,14 +26,12 @@ import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -66,7 +64,6 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -129,18 +126,7 @@ static KeyValueStore> getBeamStore(TaskContext context) */ static Factory createNonKeyedStateInternalsFactory( String id, TaskContext context, SamzaPipelineOptions pipelineOptions) { - return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptySet()); - } - - static Factory createStateInternalsFactory( - String id, - Coder keyCoder, - TaskContext context, - SamzaPipelineOptions pipelineOptions, - DoFnSignature signature) { - - return createStateInternalsFactory( - id, keyCoder, context, pipelineOptions, signature.stateDeclarations().keySet()); + return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptyMap()); } static Factory createStateInternalsFactory( @@ -150,31 +136,35 @@ static Factory createStateInternalsFactory( SamzaPipelineOptions pipelineOptions, ExecutableStage executableStage) { - Set stateIds = + Map stateIdToStoreMap = executableStage.getUserStates().stream() - .map(UserStateReference::localName) - .collect(Collectors.toSet()); + .collect( + Collectors.toMap(UserStateReference::localName, UserStateReference::localName)); - return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds); + return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIdToStoreMap); } @SuppressWarnings("unchecked") - private static Factory createStateInternalsFactory( + static Factory createStateInternalsFactory( String id, @Nullable Coder keyCoder, TaskContext context, SamzaPipelineOptions pipelineOptions, - Collection stateIds) { + Map stateIdToStoreMap) { final int batchGetSize = pipelineOptions.getStoreBatchGetSize(); final Map>> stores = new HashMap<>(); stores.put(BEAM_STORE, getBeamStore(context)); final Coder stateKeyCoder; if (keyCoder != null) { - stateIds.forEach( - stateId -> - stores.put( - stateId, (KeyValueStore>) context.getStore(stateId))); + stateIdToStoreMap + .keySet() + .forEach( + stateId -> + stores.put( + stateId, + (KeyValueStore>) + context.getStore(stateIdToStoreMap.get(stateId)))); stateKeyCoder = keyCoder; } else { stateKeyCoder = (Coder) VoidCoder.of(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java index e7079f3cbe8bf..a805f36da610b 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java @@ -163,6 +163,17 @@ private static void doTranslate( Map> sideInputMapping = ParDoTranslation.getSideInputMapping(ctx.getCurrentTransform()); + final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + final Map stateIdToStoreMapping = new HashMap<>(); + for (String stateId : signature.stateDeclarations().keySet()) { + String storeId = stateId; + if (!ctx.isUniqueStateId(stateId)) { + final String escapedName = + SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName()); + storeId = StoreIdUtils.toUniqueStoreId(stateId, escapedName); + } + stateIdToStoreMapping.put(stateId, storeId); + } final DoFnOp op = new DoFnOp<>( transform.getMainOutputTag(), @@ -184,7 +195,8 @@ private static void doTranslate( null, Collections.emptyMap(), doFnSchemaInformation, - sideInputMapping); + sideInputMapping, + stateIdToStoreMapping); final MessageStream> mergedStreams; if (sideInputStreams.isEmpty()) { @@ -335,7 +347,8 @@ private static void doTranslatePortable( ctx.getJobInfo(), idToTupleTagMap, doFnSchemaInformation, - sideInputMapping); + sideInputMapping, + Collections.emptyMap()); final MessageStream> mergedStreams; if (sideInputStreams.isEmpty()) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java index 776ee80878d1e..2195489ca13fb 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java @@ -50,7 +50,7 @@ public class PortableTranslationContext extends TranslationContext { public PortableTranslationContext( StreamApplicationDescriptor appDescriptor, SamzaPipelineOptions options, JobInfo jobInfo) { - super(appDescriptor, Collections.emptyMap(), options); + super(appDescriptor, Collections.emptyMap(), Collections.emptySet(), options); this.jobInfo = jobInfo; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java index 8488594504922..f33f75bc1867c 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java @@ -83,6 +83,7 @@ public class TranslationContext { private final Map idMap; private final Map registeredInputStreams = new HashMap<>(); private final Map registeredTables = new HashMap<>(); + private final Set nonUniqueStateIds; private final SamzaPipelineOptions options; private final HashIdGenerator idGenerator = new HashIdGenerator(); @@ -91,9 +92,11 @@ public class TranslationContext { public TranslationContext( StreamApplicationDescriptor appDescriptor, Map idMap, + Set nonUniqueStateIds, SamzaPipelineOptions options) { this.appDescriptor = appDescriptor; this.idMap = idMap; + this.nonUniqueStateIds = nonUniqueStateIds; this.options = options; } @@ -241,6 +244,10 @@ public String getIdForPValue(PValue pvalue) { return id; } + public boolean isUniqueStateId(String stateId) { + return !nonUniqueStateIds.contains(stateId); + } + public String getTransformFullName() { return currentTransform.getFullName(); } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java index 8827f1e5622c0..bb357dd6aced8 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java @@ -22,8 +22,10 @@ import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.runtime.OpMessage; @@ -60,8 +62,10 @@ public class TranslationContextTest { }, getConfig()); Map idMap = new HashMap<>(); + Set nonUniqueStateIds = new HashSet<>(); TranslationContext translationContext = - new TranslationContext(streamApplicationDescriptor, idMap, mock(SamzaPipelineOptions.class)); + new TranslationContext( + streamApplicationDescriptor, idMap, nonUniqueStateIds, mock(SamzaPipelineOptions.class)); @Test public void testRegisterInputMessageStreams() { From f975430d6b61b04e00a1ba0756d7abcf5902843d Mon Sep 17 00:00:00 2001 From: Seung Jin An Date: Tue, 6 Dec 2022 14:59:31 -0800 Subject: [PATCH 5/5] Add StoreIdGenerator which encapsulates common logic used in ParDoBoundMultiTranslator within ConfigContext/TranslationContext --- .../samza/translation/ConfigContext.java | 9 ++-- .../ParDoBoundMultiTranslator.java | 20 ++------ .../samza/translation/TranslationContext.java | 13 ++--- .../runners/samza/util/StoreIdGenerator.java | 48 +++++++++++++++++++ .../beam/runners/samza/util/StoreIdUtils.java | 26 ---------- 5 files changed, 65 insertions(+), 51 deletions(-) create mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdGenerator.java delete mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java index be653991e6d31..3fc405d15a5b8 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Set; import org.apache.beam.runners.samza.SamzaPipelineOptions; +import org.apache.beam.runners.samza.util.StoreIdGenerator; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; @@ -34,13 +35,13 @@ public class ConfigContext { private final Map idMap; private AppliedPTransform currentTransform; private final SamzaPipelineOptions options; - private final Set nonUniqueStateIds; + private final StoreIdGenerator storeIdGenerator; public ConfigContext( Map idMap, Set nonUniqueStateIds, SamzaPipelineOptions options) { this.idMap = idMap; - this.nonUniqueStateIds = nonUniqueStateIds; this.options = options; + this.storeIdGenerator = new StoreIdGenerator(nonUniqueStateIds); } public void setCurrentTransform(AppliedPTransform currentTransform) { @@ -64,8 +65,8 @@ public SamzaPipelineOptions getPipelineOptions() { return this.options; } - public boolean isUniqueStateId(String stateId) { - return !nonUniqueStateIds.contains(stateId); + public StoreIdGenerator getStoreIdGenerator() { + return storeIdGenerator; } private String getIdForPValue(PValue pvalue) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java index a805f36da610b..ac7fe50042f36 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java @@ -45,7 +45,6 @@ import org.apache.beam.runners.samza.runtime.SamzaDoFnInvokerRegistrar; import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils; import org.apache.beam.runners.samza.util.StateUtils; -import org.apache.beam.runners.samza.util.StoreIdUtils; import org.apache.beam.runners.samza.util.WindowUtils; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -166,12 +165,8 @@ private static void doTranslate( final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); final Map stateIdToStoreMapping = new HashMap<>(); for (String stateId : signature.stateDeclarations().keySet()) { - String storeId = stateId; - if (!ctx.isUniqueStateId(stateId)) { - final String escapedName = - SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName()); - storeId = StoreIdUtils.toUniqueStoreId(stateId, escapedName); - } + final String transformFullName = node.getEnclosingNode().getFullName(); + final String storeId = ctx.getStoreIdGenerator().getId(stateId, transformFullName); stateIdToStoreMapping.put(stateId, storeId); } final DoFnOp op = @@ -391,14 +386,9 @@ public Map createConfig( if (signature.usesState()) { // set up user state configs - for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) { - final String stateId = state.id(); - String storeId = stateId; - if (!ctx.isUniqueStateId(stateId)) { - final String escapedName = - SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName()); - storeId = StoreIdUtils.toUniqueStoreId(stateId, escapedName); - } + for (String stateId : signature.stateDeclarations().keySet()) { + final String transformFullName = node.getEnclosingNode().getFullName(); + final String storeId = ctx.getStoreIdGenerator().getId(stateId, transformFullName); config.put( "stores." + storeId + ".factory", RocksDbKeyValueStorageEngineFactory.class.getName()); config.put("stores." + storeId + ".key.serde", "byteArraySerde"); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java index f33f75bc1867c..a1cf67befe07f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.runtime.OpMessage; import org.apache.beam.runners.samza.util.HashIdGenerator; +import org.apache.beam.runners.samza.util.StoreIdGenerator; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -83,9 +84,9 @@ public class TranslationContext { private final Map idMap; private final Map registeredInputStreams = new HashMap<>(); private final Map registeredTables = new HashMap<>(); - private final Set nonUniqueStateIds; private final SamzaPipelineOptions options; private final HashIdGenerator idGenerator = new HashIdGenerator(); + private final StoreIdGenerator storeIdGenerator; private AppliedPTransform currentTransform; @@ -96,8 +97,8 @@ public TranslationContext( SamzaPipelineOptions options) { this.appDescriptor = appDescriptor; this.idMap = idMap; - this.nonUniqueStateIds = nonUniqueStateIds; this.options = options; + this.storeIdGenerator = new StoreIdGenerator(nonUniqueStateIds); } public void registerInputMessageStream( @@ -244,10 +245,6 @@ public String getIdForPValue(PValue pvalue) { return id; } - public boolean isUniqueStateId(String stateId) { - return !nonUniqueStateIds.contains(stateId); - } - public String getTransformFullName() { return currentTransform.getFullName(); } @@ -256,6 +253,10 @@ public String getTransformId() { return idGenerator.getId(getTransformFullName()); } + public StoreIdGenerator getStoreIdGenerator() { + return storeIdGenerator; + } + /** The dummy stream created will only be used in Beam tests. */ private static InputDescriptor, ?> createDummyStreamDescriptor(String id) { final GenericSystemDescriptor dummySystem = diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdGenerator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdGenerator.java new file mode 100644 index 0000000000000..aefba0186c8d4 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdGenerator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.util; + +import java.util.Set; + +/** + * This class encapsulates the logic to generate unique store id. For unique state ids across the + * Beam pipeline, store id is the same as the state id. For non-unique state ids, join the state id + * with an escaped transform name to generate a unique store id. + */ +public class StoreIdGenerator { + + private final Set nonUniqueStateIds; + + public StoreIdGenerator(Set nonUniqueStateId) { + this.nonUniqueStateIds = nonUniqueStateId; + } + + public String getId(String stateId, String transformFullName) { + String storeId = stateId; + if (nonUniqueStateIds.contains(stateId)) { + final String escapedName = SamzaPipelineTranslatorUtils.escape(transformFullName); + storeId = toUniqueStoreId(stateId, escapedName); + } + return storeId; + } + + /** Join state id and escaped PTransform name to uniquify store id. */ + private static String toUniqueStoreId(String stateId, String escapedPTransformName) { + return String.join("-", stateId, escapedPTransformName); + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java deleted file mode 100644 index d2723509579a8..0000000000000 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.samza.util; - -public class StoreIdUtils { - - /** Join stateId and escaped PTransform name to uniquify storeId. */ - public static String toUniqueStoreId(String stateId, String escapedPTransformName) { - return String.join("-", stateId, escapedPTransformName); - } -}