diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/NativeTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/NativeTransforms.java new file mode 100644 index 0000000000000..730a5ca561f51 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/NativeTransforms.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.util.Iterator; +import java.util.ServiceLoader; +import java.util.function.Predicate; +import org.apache.beam.model.pipeline.v1.RunnerApi; + +/** + * An extension point for users to define their own native transforms for usage with specific + * runners. This extension point enables shared libraries within the Apache Beam codebase to treat + * the native transform as a primitive transforms that the runner implicitly understands. + * + *

Warning:Usage of native transforms within pipelines will prevent users from migrating + * between runners as there is no expectation that the transform will be understood by all runners. + * Note that for some use cases this can be a way to test out a new type of transform on a limited + * set of runners and promote its adoption as a primitive within the Apache Beam model. + * + *

Note that users are required to ensure that translation and execution for the native transform + * is supported by their runner. + * + *

Automatic registration occurs by creating a {@link ServiceLoader} entry and a concrete + * implementation of the {@link IsNativeTransform} interface. It is optional but recommended to use + * one of the many build time tools such as {@link AutoService} to generate the necessary META-INF + * files automatically. + */ +public class NativeTransforms { + /** + * Returns true if an only if the Runner understands this transform and can handle it directly. + */ + public static boolean isNative(RunnerApi.PTransform pTransform) { + Iterator matchers = ServiceLoader.load(IsNativeTransform.class).iterator(); + while (matchers.hasNext()) { + if (matchers.next().test(pTransform)) { + return true; + } + } + return false; + } + + /** A predicate which returns true if and only if the transform is a native transform. */ + public interface IsNativeTransform extends Predicate { + @Override + boolean test(RunnerApi.PTransform pTransform); + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java index 31cef9c14db99..3eed941f5a646 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java @@ -57,6 +57,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.NativeTransforms; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; @@ -154,7 +155,7 @@ static Collection getPrimitiveTransformIds(RunnerApi.Components componen /** Returns true if the provided transform is a primitive. */ private static boolean isPrimitiveTransform(PTransform transform) { String urn = PTransformTranslation.urnForTransformOrNull(transform); - return PRIMITIVE_URNS.contains(urn); + return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform); } private MutableNetwork buildNetwork( diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/NativeTransformsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/NativeTransformsTest.java new file mode 100644 index 0000000000000..aa3c0332676aa --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/NativeTransformsTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.NativeTransforms.IsNativeTransform; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link NativeTransforms}. */ +@RunWith(JUnit4.class) +public class NativeTransformsTest { + /** A test implementation of a {@link IsNativeTransform}. */ + @AutoService(IsNativeTransform.class) + public static class TestNativeTransform implements IsNativeTransform { + + @Override + public boolean test(RunnerApi.PTransform pTransform) { + return "test".equals(PTransformTranslation.urnForTransformOrNull(pTransform)); + } + } + + @Test + public void testMatch() { + NativeTransforms.isNative( + RunnerApi.PTransform.newBuilder() + .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("test").build()) + .build()); + } + + @Test + public void testNoMatch() { + NativeTransforms.isNative(RunnerApi.PTransform.getDefaultInstance()); + } +}