diff --git a/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index 8b137891791fe..e69de29bb2d1d 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -1 +0,0 @@ - diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/.github/workflows/README.md b/.github/workflows/README.md index 42eaaaafdaced..f882553ef9bf3 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -271,7 +271,6 @@ PreCommit Jobs run in a schedule and also get triggered in a PR if relevant sour | [ PreCommit Website ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml) | N/A |`Run Website PreCommit`| [![.github/workflows/beam_PreCommit_Website.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml?query=event%3Aschedule) | | [ PreCommit Website Stage GCS ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml) | N/A |`Run Website_Stage_GCS PreCommit`| [![.github/workflows/beam_PreCommit_Website_Stage_GCS.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml?query=event%3Aschedule) | | [ PreCommit Whitespace ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml) | N/A |`Run Whitespace PreCommit`| [![.github/workflows/beam_PreCommit_Whitespace.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml?query=event%3Aschedule) | -| [ PreCommit Xlang Generated Transforms ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml) | N/A |`Run Xlang_Generated_Transforms PreCommit`| [![.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml?query=event%3Aschedule) | ### PostCommit Jobs diff --git a/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml b/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml deleted file mode 100644 index f8d64eb5d4a66..0000000000000 --- a/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml +++ /dev/null @@ -1,114 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name: PreCommit Xlang Generated Transforms - -on: - push: - tags: ['v*'] - branches: ['master', 'release-*'] - paths: - - 'model/**' - - 'sdks/python/**' - - 'sdks/java/expansion-service/**' - - 'sdks/java/core/**' - - 'sdks/java/io/**' - - 'sdks/java/extensions/sql/**' - - 'release/**' - - '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml' - pull_request_target: - branches: ['master', 'release-*'] - paths: - - 'model/**' - - 'sdks/python/**' - - 'sdks/java/expansion-service/**' - - 'sdks/java/core/**' - - 'sdks/java/io/**' - - 'sdks/java/extensions/sql/**' - - 'release/**' - - 'release/trigger_all_tests.json' - - '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml' - issue_comment: - types: [created] - schedule: - - cron: '30 2/6 * * *' - workflow_dispatch: - -#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event -permissions: - actions: write - pull-requests: read - checks: read - contents: read - deployments: read - id-token: none - issues: read - discussions: read - packages: read - pages: read - repository-projects: read - security-events: read - statuses: read - -# This allows a subsequently queued workflow run to interrupt previous runs -concurrency: - group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' - cancel-in-progress: true - -env: - GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} - GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} - -jobs: - beam_PreCommit_Xlang_Generated_Transforms: - name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{ matrix.python_version }}) - timeout-minutes: 120 - runs-on: ['self-hosted', ubuntu-20.04, main] - strategy: - fail-fast: false - matrix: - job_name: ['beam_PreCommit_Xlang_Generated_Transforms'] - job_phrase: ['Run Xlang_Generated_Transforms PreCommit'] - python_version: ['3.8'] - if: | - github.event_name == 'push' || - github.event_name == 'workflow_dispatch' || - github.event_name == 'pull_request_target' || - (github.event_name == 'schedule' && github.repository == 'apache/beam') || - startsWith(github.event.comment.body, 'Run Xlang_Generated_Transforms PreCommit') - steps: - - uses: actions/checkout@v4 - - name: Setup repository - uses: ./.github/actions/setup-action - with: - comment_phrase: ${{ matrix.job_phrase }} - github_token: ${{ secrets.GITHUB_TOKEN }} - github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }} - - name: Setup environment - uses: ./.github/actions/setup-environment-action - with: - java-version: 8 - python-version: ${{ matrix.python_version }} - - name: Set PY_VER_CLEAN - id: set_py_ver_clean - run: | - PY_VER=${{ matrix.python_version }} - PY_VER_CLEAN=${PY_VER//.} - echo "py_ver_clean=$PY_VER_CLEAN" >> $GITHUB_OUTPUT - - name: run Cross-Language Wrapper Validation script - uses: ./.github/actions/gradle-command-self-hosted-action - with: - gradle-command: :sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit \ No newline at end of file diff --git a/.gitignore b/.gitignore index c76441078f3d4..3f7d7af44a8f7 100644 --- a/.gitignore +++ b/.gitignore @@ -52,7 +52,6 @@ sdks/python/**/*.egg sdks/python/LICENSE sdks/python/NOTICE sdks/python/README.md -sdks/python/apache_beam/transforms/xlang/* sdks/python/apache_beam/portability/api/* sdks/python/apache_beam/yaml/docs/* sdks/python/nosetests*.xml diff --git a/CHANGES.md b/CHANGES.md index 57d729ac5f5db..08744ee9ac191 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -58,7 +58,6 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). -* The Python SDK will now include automatically generated wrappers for external Java transforms! ([#29834](https://github.com/apache/beam/pull/29834)) ## I/Os diff --git a/build.gradle.kts b/build.gradle.kts index 7d0371a7282ab..b44c79ab7bfee 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -647,11 +647,6 @@ tasks.register("checkSetup") { dependsOn(":examples:java:wordCount") } -// Generates external transform config -project.tasks.register("generateExternalTransformsConfig") { - dependsOn(":sdks:python:generateExternalTransformsConfig") -} - // Configure the release plugin to do only local work; the release manager determines what, if // anything, to push. On failure, the release manager can reset the branch without pushing. release { diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 6434746fd3ab3..5b73940b99dd5 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -321,17 +321,19 @@ class BeamModulePlugin implements Plugin { // A class defining the common properties in a given suite of cross-language tests // Properties are shared across runners and are used when creating a CrossLanguageUsingJavaExpansionConfiguration object - static class CrossLanguageTask { + static class CrossLanguageTaskCommon { // Used as the task name for cross-language String name - // List of project paths for required expansion services - List expansionProjectPaths + // The expansion service's project path (required) + String expansionProjectPath // Collect Python pipeline tests with this marker String collectMarker - // Additional environment variables to set before running tests + // Job server startup task. + TaskProvider startJobServer + // Job server cleanup task. + TaskProvider cleanupJobServer + // any additional environment variables specific to the suite of tests Map additionalEnvs - // Additional Python dependencies to install before running tests - List additionalDeps } // A class defining the configuration for CrossLanguageUsingJavaExpansion. @@ -347,16 +349,18 @@ class BeamModulePlugin implements Plugin { ] // Additional pytest options List pytestOptions = [] + // Job server startup task. + TaskProvider startJobServer + // Job server cleanup task. + TaskProvider cleanupJobServer // Number of parallel test runs. Integer numParallelTests = 1 - // List of project paths for required expansion services - List expansionProjectPaths + // Project path for the expansion service to start up + String expansionProjectPath // Collect Python pipeline tests with this marker String collectMarker // any additional environment variables to be exported Map additionalEnvs - // Additional Python dependencies to install before running tests - List additionalDeps } // A class defining the configuration for CrossLanguageValidatesRunner. @@ -2572,7 +2576,7 @@ class BeamModulePlugin implements Plugin { /** ***********************************************************************************************/ // Method to create the createCrossLanguageUsingJavaExpansionTask. // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter. - // This method creates a task that runs Python SDK test-suites that use external Java transforms + // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service project.ext.createCrossLanguageUsingJavaExpansionTask = { // This task won't work if the python build file doesn't exist. if (!project.project(":sdks:python").buildFile.exists()) { @@ -2582,29 +2586,49 @@ class BeamModulePlugin implements Plugin { def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration() project.evaluationDependsOn(":sdks:python") - for (path in config.expansionProjectPaths) { - project.evaluationDependsOn(path) - } + project.evaluationDependsOn(config.expansionProjectPath) project.evaluationDependsOn(":sdks:java:extensions:python") + // Setting up args to launch the expansion service def pythonDir = project.project(":sdks:python").projectDir + def javaExpansionPort = -1 // will be populated in setupTask + def expansionJar = project.project(config.expansionProjectPath).shadowJar.archivePath + def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath() + def expansionServiceOpts = [ + "group_id": project.name, + "java_expansion_service_jar": expansionJar, + "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile, + ] def usesDataflowRunner = config.pythonPipelineOptions.contains("--runner=TestDataflowRunner") || config.pythonPipelineOptions.contains("--runner=DataflowRunner") def javaContainerSuffix = getSupportedJavaVersion() - // Sets up, collects, and runs Python pipeline tests - project.tasks.register(config.name+"PythonUsingJava") { - group = "Verification" - description = "Runs Python SDK pipeline tests that use a Java expansion service" - // Each expansion service we use needs to be built before running these tests - // The built jars will be started up automatically using the BeamJarExpansionService utility - for (path in config.expansionProjectPaths) { - dependsOn project.project(path).shadowJar.getPath() - } - dependsOn ":sdks:java:container:$javaContainerSuffix:docker" - dependsOn "installGcpTest" + // 1. Builds the chosen expansion service jar and launches it + def setupTask = project.tasks.register(config.name+"Setup") { + dependsOn ':sdks:java:container:' + javaContainerSuffix + ':docker' + dependsOn project.project(config.expansionProjectPath).shadowJar.getPath() + dependsOn 'installGcpTest' if (usesDataflowRunner) { dependsOn ":sdks:python:test-suites:dataflow:py${project.ext.pythonVersion.replace('.', '')}:initializeForDataflowJob" } + doLast { + project.exec { + // Prepare a port to use for the expansion service + javaExpansionPort = getRandomPort() + expansionServiceOpts.put("java_port", javaExpansionPort) + // setup test env + def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts) + executable 'sh' + args '-c', ". ${project.ext.envdir}/bin/activate && $pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} && $pythonDir/scripts/run_expansion_services.sh start $serviceArgs" + } + } + } + + // 2. Sets up, collects, and runs Python pipeline tests + def pythonTask = project.tasks.register(config.name+"PythonUsingJava") { + group = "Verification" + description = "Runs Python SDK pipeline tests that use a Java expansion service" + dependsOn setupTask + dependsOn config.startJobServer doLast { def beamPythonTestPipelineOptions = [ "pipeline_opts": config.pythonPipelineOptions + (usesDataflowRunner ? [ @@ -2617,19 +2641,29 @@ class BeamModulePlugin implements Plugin { def cmdArgs = project.project(':sdks:python').mapToArgString(beamPythonTestPipelineOptions) project.exec { - // environment variable to indicate that jars have been built - environment "EXPANSION_JARS", config.expansionProjectPaths - String additionalDependencyCmd = "" - if (config.additionalDeps != null && !config.additionalDeps.isEmpty()){ - additionalDependencyCmd = "&& pip install ${config.additionalDeps.join(' ')} " + environment "EXPANSION_JAR", expansionJar + environment "EXPANSION_PORT", javaExpansionPort + for (envs in config.additionalEnvs){ + environment envs.getKey(), envs.getValue() } executable 'sh' - args '-c', ". ${project.ext.envdir}/bin/activate " + - additionalDependencyCmd + - "&& cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs" + args '-c', ". ${project.ext.envdir}/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs" } } } + + // 3. Shuts down the expansion service + def cleanupTask = project.tasks.register(config.name+'Cleanup', Exec) { + // teardown test env + executable 'sh' + args '-c', ". ${project.ext.envdir}/bin/activate && $pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name}" + } + + setupTask.configure {finalizedBy cleanupTask} + config.startJobServer.configure {finalizedBy config.cleanupJobServer} + + cleanupTask.configure{mustRunAfter pythonTask} + config.cleanupJobServer.configure{mustRunAfter pythonTask} } /** ***********************************************************************************************/ diff --git a/scripts/ci/release/test/resources/mass_comment.txt b/scripts/ci/release/test/resources/mass_comment.txt index 1f6f340eb0b73..93468b0c961b4 100644 --- a/scripts/ci/release/test/resources/mass_comment.txt +++ b/scripts/ci/release/test/resources/mass_comment.txt @@ -79,7 +79,6 @@ Run PythonDocs PreCommit Run PythonFormatter PreCommit Run PythonLint PreCommit Run Python_PVR_Flink PreCommit -Run Python_Xlang_Gcp_Direct PostCommit Run RAT PreCommit Run SQL PostCommit Run SQL PreCommit @@ -95,7 +94,6 @@ Run Twister2 ValidatesRunner Run Typescript PreCommit Run ULR Loopback ValidatesRunner Run Whitespace PreCommit -Run Xlang_Generated_Transforms PreCommit Run XVR_Direct PostCommit Run XVR_Flink PostCommit Run XVR_JavaUsingPython_Dataflow PostCommit diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java index d9dfc2a90bd87..4b693f883fb7c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java @@ -66,7 +66,7 @@ public List outputCollectionNames() { public String description() { return String.format( "Outputs a PCollection of Beam Rows, each containing a single INT64 " - + "number called \"value\". The count is produced from the given \"start\" " + + "number called \"value\". The count is produced from the given \"start\"" + "value and either up to the given \"end\" or until 2^63 - 1.%n" + "To produce an unbounded PCollection, simply do not specify an \"end\" value. " + "Unbounded sequences can specify a \"rate\" for output elements.%n" diff --git a/sdks/python/MANIFEST.in b/sdks/python/MANIFEST.in index d40273a9340de..60b0989d9af7d 100644 --- a/sdks/python/MANIFEST.in +++ b/sdks/python/MANIFEST.in @@ -16,7 +16,6 @@ # include gen_protos.py -include gen_xlang_wrappers.py include README.md include NOTICE include LICENSE diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py index 83d45d81a5a10..4945da97d90f3 100644 --- a/sdks/python/apache_beam/io/__init__.py +++ b/sdks/python/apache_beam/io/__init__.py @@ -36,7 +36,6 @@ from apache_beam.io.gcp.bigquery import * from apache_beam.io.gcp.pubsub import * from apache_beam.io.gcp import gcsio - from apache_beam.transforms.xlang.io import * except ImportError: pass # pylint: enable=wrong-import-order, wrong-import-position diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index cfbb411b4e5f1..c1e9754526e8f 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -53,10 +53,6 @@ @pytest.mark.uses_gcp_java_expansion_service -@unittest.skipUnless( - os.environ.get('EXPANSION_JARS'), - "EXPANSION_JARS environment var is not provided, " - "indicating that jars have not been built") class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' @@ -116,6 +112,10 @@ def setUp(self): _LOGGER.info( "Created dataset %s in project %s", self.dataset_id, self.project) + self.assertTrue( + os.environ.get('EXPANSION_PORT'), "Expansion service port not found!") + self.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) + def tearDown(self): try: _LOGGER.info( @@ -162,7 +162,8 @@ def run_storage_write_test( table=table_id, method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, schema=schema, - use_at_least_once=use_at_least_once)) + use_at_least_once=use_at_least_once, + expansion_service=self.expansion_service)) hamcrest_assert(p, bq_matcher) def test_all_types(self): @@ -242,7 +243,8 @@ def test_write_with_beam_rows(self): _ = ( p | beam.Create(row_elements) - | StorageWriteToBigQuery(table=table_id)) + | StorageWriteToBigQuery( + table=table_id, expansion_service=self.expansion_service)) hamcrest_assert(p, bq_matcher) def test_write_to_dynamic_destinations(self): @@ -266,7 +268,8 @@ def test_write_to_dynamic_destinations(self): table=lambda record: spec_with_project + str(record['int']), method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, schema=self.ALL_TYPES_SCHEMA, - use_at_least_once=False)) + use_at_least_once=False, + expansion_service=self.expansion_service)) hamcrest_assert(p, all_of(*bq_matchers)) def test_write_to_dynamic_destinations_with_beam_rows(self): @@ -300,7 +303,8 @@ def test_write_to_dynamic_destinations_with_beam_rows(self): | beam.io.WriteToBigQuery( table=lambda record: spec_with_project + str(record.my_int), method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=False)) + use_at_least_once=False, + expansion_service=self.expansion_service)) hamcrest_assert(p, all_of(*bq_matchers)) def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): @@ -331,7 +335,8 @@ def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): triggering_frequency=1, with_auto_sharding=auto_sharding, num_storage_api_streams=num_streams, - use_at_least_once=use_at_least_once)) + use_at_least_once=use_at_least_once, + expansion_service=self.expansion_service)) hamcrest_assert(p, bq_matcher) def skip_if_not_dataflow_runner(self) -> bool: diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 13909cded1ff2..867dca9a5e7eb 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -58,11 +58,10 @@ def instance_prefix(instance): @pytest.mark.uses_gcp_java_expansion_service @pytest.mark.uses_transform_service -@unittest.skipIf(client is None, 'Bigtable dependencies are not installed') @unittest.skipUnless( - os.environ.get('EXPANSION_JARS'), - "EXPANSION_JARS environment var is not provided, " - "indicating that jars have not been built") + os.environ.get('EXPANSION_PORT'), + "EXPANSION_PORT environment var is not provided.") +@unittest.skipIf(client is None, 'Bigtable dependencies are not installed') class TestReadFromBigTableIT(unittest.TestCase): INSTANCE = "bt-read-tests" TABLE_ID = "test-table" @@ -71,6 +70,7 @@ def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.args = self.test_pipeline.get_full_options_as_args() self.project = self.test_pipeline.get_option('project') + self.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) instance_id = instance_prefix(self.INSTANCE) @@ -141,7 +141,8 @@ def test_read_xlang(self): | bigtableio.ReadFromBigtable( project_id=self.project, instance_id=self.instance.instance_id, - table_id=self.table.table_id) + table_id=self.table.table_id, + expansion_service=self.expansion_service) | "Extract cells" >> beam.Map(lambda row: row._cells)) assert_that(cells, equal_to(expected_cells)) @@ -149,11 +150,10 @@ def test_read_xlang(self): @pytest.mark.uses_gcp_java_expansion_service @pytest.mark.uses_transform_service -@unittest.skipIf(client is None, 'Bigtable dependencies are not installed') @unittest.skipUnless( - os.environ.get('EXPANSION_JARS'), - "EXPANSION_JARS environment var is not provided, " - "indicating that jars have not been built") + os.environ.get('EXPANSION_PORT'), + "EXPANSION_PORT environment var is not provided.") +@unittest.skipIf(client is None, 'Bigtable dependencies are not installed') class TestWriteToBigtableXlangIT(unittest.TestCase): # These are integration tests for the cross-language write transform. INSTANCE = "bt-write-xlang" @@ -164,6 +164,7 @@ def setUpClass(cls): cls.test_pipeline = TestPipeline(is_integration_test=True) cls.project = cls.test_pipeline.get_option('project') cls.args = cls.test_pipeline.get_full_options_as_args() + cls.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) instance_id = instance_prefix(cls.INSTANCE) @@ -214,7 +215,8 @@ def run_pipeline(self, rows): project_id=self.project, instance_id=self.instance.instance_id, table_id=self.table.table_id, - use_cross_language=True)) + use_cross_language=True, + expansion_service=self.expansion_service)) def test_set_mutation(self): row1: DirectRow = DirectRow('key-1') diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py b/sdks/python/apache_beam/transforms/external_transform_provider.py index 2799bd1b9e935..26cc31471e69a 100644 --- a/sdks/python/apache_beam/transforms/external_transform_provider.py +++ b/sdks/python/apache_beam/transforms/external_transform_provider.py @@ -144,7 +144,7 @@ class ExternalTransformProvider: A :class:`ExternalTransform` subclass is generated for each external transform, and is named based on what can be inferred from the URN - (see the `urn_pattern` parameter). + (see :param urn_pattern). These classes are generated when :class:`ExternalTransformProvider` is initialized. We need to give it one or more expansion service addresses that @@ -256,7 +256,7 @@ def _create_wrappers(self): if skipped_urns: logging.info( - "Skipped URN(s) in %s that don't follow the pattern \"%s\": %s", + "Skipped URN(s) in %s that don't follow the pattern [%s]: %s", target, self._urn_pattern, skipped_urns) @@ -268,10 +268,6 @@ def get_available(self) -> List[Tuple[str, str]]: """Get a list of available ExternalTransform names and identifiers""" return list(self._name_to_urn.items()) - def get_all(self) -> Dict[str, ExternalTransform]: - """Get all ExternalTransform""" - return self._transforms - def get(self, name) -> ExternalTransform: """Get an ExternalTransform by its inferred class name""" return self._transforms[self._name_to_urn[name]] diff --git a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py deleted file mode 100644 index a53001c85fd39..0000000000000 --- a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py +++ /dev/null @@ -1,413 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import importlib -import logging -import os -import secrets -import shutil -import time -import typing -import unittest -from os.path import dirname - -import numpy -import pytest -import yaml - -import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to -from apache_beam.transforms.external import BeamJarExpansionService -from apache_beam.transforms.external_transform_provider import STANDARD_URN_PATTERN -from apache_beam.transforms.external_transform_provider import ExternalTransform -from apache_beam.transforms.external_transform_provider import ExternalTransformProvider -from apache_beam.transforms.external_transform_provider import camel_case_to_snake_case -from apache_beam.transforms.external_transform_provider import infer_name_from_identifier -from apache_beam.transforms.external_transform_provider import snake_case_to_lower_camel_case -from apache_beam.transforms.external_transform_provider import snake_case_to_upper_camel_case -from apache_beam.transforms.xlang.io import GenerateSequence - - -class NameAndTypeUtilsTest(unittest.TestCase): - def test_snake_case_to_upper_camel_case(self): - test_cases = [("", ""), ("test", "Test"), ("test_name", "TestName"), - ("test_double_underscore", "TestDoubleUnderscore"), - ("TEST_CAPITALIZED", "TestCapitalized"), - ("_prepended_underscore", "PrependedUnderscore"), - ("appended_underscore_", "AppendedUnderscore")] - for case in test_cases: - self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0])) - - def test_snake_case_to_lower_camel_case(self): - test_cases = [("", ""), ("test", "test"), ("test_name", "testName"), - ("test_double_underscore", "testDoubleUnderscore"), - ("TEST_CAPITALIZED", "testCapitalized"), - ("_prepended_underscore", "prependedUnderscore"), - ("appended_underscore_", "appendedUnderscore")] - for case in test_cases: - self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0])) - - def test_camel_case_to_snake_case(self): - test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"), - ("TestDoubleUnderscore", - "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"), - ("BEGINNINGAllCaps", - "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"), - ("AllCapsMIDDLEWord", "all_caps_middle_word"), - ("lowerCamelCase", "lower_camel_case")] - for case in test_cases: - self.assertEqual(case[1], camel_case_to_snake_case(case[0])) - - def test_infer_name_from_identifier(self): - standard_test_cases = [ - ("beam:schematransform:org.apache.beam:transform:v1", "Transform"), - ("beam:schematransform:org.apache.beam:my_transform:v1", - "MyTransform"), ( - "beam:schematransform:org.apache.beam:my_transform:v2", - "MyTransformV2"), - ("beam:schematransform:org.apache.beam:fe_fi_fo_fum:v2", "FeFiFoFumV2"), - ("beam:schematransform:bad_match:my_transform:v1", None) - ] - for case in standard_test_cases: - self.assertEqual( - case[1], infer_name_from_identifier(case[0], STANDARD_URN_PATTERN)) - - custom_pattern_cases = [ - # (, , ) - ( - r"^custom:transform:([\w-]+):(\w+)$", - "custom:transform:my_transform:v1", - "MyTransformV1"), - ( - r"^org.user:([\w-]+):([\w-]+):([\w-]+):external$", - "org.user:some:custom_transform:we_made:external", - "SomeCustomTransformWeMade"), - ( - r"^([\w-]+):user.transforms", - "my_eXTErnal:user.transforms", - "MyExternal"), - (r"^([\w-]+):user.transforms", "my_external:badinput.transforms", None), - ] - for case in custom_pattern_cases: - self.assertEqual(case[2], infer_name_from_identifier(case[1], case[0])) - - -@pytest.mark.uses_io_java_expansion_service -@unittest.skipUnless( - os.environ.get('EXPANSION_JARS'), - "EXPANSION_JARS environment var is not provided, " - "indicating that jars have not been built") -class ExternalTransformProviderIT(unittest.TestCase): - def test_generate_sequence_config_schema_and_description(self): - provider = ExternalTransformProvider( - BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) - - self.assertTrue(( - 'GenerateSequence', - 'beam:schematransform:org.apache.beam:generate_sequence:v1' - ) in provider.get_available()) - - GenerateSequence = provider.get('GenerateSequence') - config_schema = GenerateSequence.configuration_schema - for param in ['start', 'end', 'rate']: - self.assertTrue(param in config_schema) - - description_substring = ( - "Outputs a PCollection of Beam Rows, each " - "containing a single INT64") - self.assertTrue(description_substring in GenerateSequence.description) - - def test_run_generate_sequence(self): - provider = ExternalTransformProvider( - BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) - - with beam.Pipeline() as p: - numbers = p | provider.GenerateSequence( - start=0, end=10) | beam.Map(lambda row: row.value) - - assert_that(numbers, equal_to([i for i in range(10)])) - - -@pytest.mark.xlang_wrapper_generation -@unittest.skipUnless( - os.environ.get('EXPANSION_JARS'), - "EXPANSION_JARS environment var is not provided, " - "indicating that jars have not been built") -class AutoGenerationScriptIT(unittest.TestCase): - """ - This class tests the generation and regeneration operations in - `gen_xlang_wrappers.py`. - """ - - # tests cases will use GenerateSequence - GEN_SEQ_IDENTIFIER = \ - 'beam:schematransform:org.apache.beam:generate_sequence:v1' - - def setUp(self): - # import script from top-level sdks/python directory - self.sdk_dir = os.path.abspath(dirname(dirname(dirname(__file__)))) - spec = importlib.util.spec_from_file_location( - 'gen_xlang_wrappers', - os.path.join(self.sdk_dir, 'gen_xlang_wrappers.py')) - self.script = importlib.util.module_from_spec(spec) - spec.loader.exec_module(self.script) - args = TestPipeline(is_integration_test=True).get_full_options_as_args() - runner = PipelineOptions(args).get_all_options()['runner'] - if runner and "direct" not in runner.lower(): - self.skipTest( - "It is sufficient to run this test in the DirectRunner " - "test suite only.") - - self.test_dir_name = 'test_gen_script_%d_%s' % ( - int(time.time()), secrets.token_hex(3)) - self.test_dir = os.path.join( - os.path.abspath(dirname(__file__)), self.test_dir_name) - self.service_config_path = os.path.join( - self.test_dir, "test_expansion_service_config.yaml") - self.transform_config_path = os.path.join( - self.test_dir, "test_transform_config.yaml") - os.mkdir(self.test_dir) - - def tearDown(self): - shutil.rmtree(self.test_dir, ignore_errors=False) - - def delete_and_validate(self): - self.script.delete_generated_files(self.test_dir) - self.assertEqual(len(os.listdir(self.test_dir)), 0) - - def test_script_fails_with_invalid_destinations(self): - expansion_service_config = { - "gradle_target": 'sdks:java:io:expansion-service:shadowJar', - 'destinations': { - 'python': 'apache_beam/some_nonexistent_dir' - } - } - with self.assertRaises(ValueError): - self.create_and_check_transforms_config_exists(expansion_service_config) - - def test_pretty_types(self): - types = [ - typing.Optional[typing.List[str]], - numpy.int16, - str, - typing.Dict[str, numpy.float64], - typing.Optional[typing.Dict[str, typing.List[numpy.int64]]], - typing.Dict[int, typing.Optional[str]] - ] - - expected_type_names = [('List[str]', True), ('numpy.int16', False), - ('str', False), ('Dict[str, numpy.float64]', False), - ('Dict[str, List[numpy.int64]]', True), - ('Dict[int, Union[str, NoneType]]', False)] - - for i in range(len(types)): - self.assertEqual( - self.script.pretty_type(types[i]), expected_type_names[i]) - - def create_and_check_transforms_config_exists(self, expansion_service_config): - with open(self.service_config_path, 'w') as f: - yaml.dump([expansion_service_config], f) - - self.script.generate_transforms_config( - self.service_config_path, self.transform_config_path) - self.assertTrue(os.path.exists(self.transform_config_path)) - - def create_and_validate_transforms_config( - self, expansion_service_config, expected_name, expected_destination): - self.create_and_check_transforms_config_exists(expansion_service_config) - - with open(self.transform_config_path) as f: - configs = yaml.safe_load(f) - gen_seq_config = None - for config in configs: - if config['identifier'] == self.GEN_SEQ_IDENTIFIER: - gen_seq_config = config - self.assertIsNotNone(gen_seq_config) - self.assertEqual( - gen_seq_config['default_service'], - expansion_service_config['gradle_target']) - self.assertEqual(gen_seq_config['name'], expected_name) - self.assertEqual( - gen_seq_config['destinations']['python'], expected_destination) - self.assertIn("end", gen_seq_config['fields']) - self.assertIn("start", gen_seq_config['fields']) - self.assertIn("rate", gen_seq_config['fields']) - - def get_module(self, dest): - module_name = dest.replace('apache_beam/', '').replace('/', '_') - module = 'apache_beam.transforms.%s.%s' % (self.test_dir_name, module_name) - return importlib.import_module(module) - - def write_wrappers_to_destinations_and_validate( - self, destinations: typing.List[str]): - """ - Generate wrappers from the config path and validate all destinations are - included. - Then write wrappers to destinations and validate all destination paths - exist. - - :return: Generated wrappers grouped by destination - """ - grouped_wrappers = self.script.get_wrappers_from_transform_configs( - self.transform_config_path) - for dest in destinations: - self.assertIn(dest, grouped_wrappers) - - # write to our test directory to avoid messing with other files - self.script.write_wrappers_to_destinations( - grouped_wrappers, self.test_dir, format_code=False) - - for dest in destinations: - self.assertTrue( - os.path.exists( - os.path.join( - self.test_dir, - dest.replace('apache_beam/', '').replace('/', '_') + ".py"))) - return grouped_wrappers - - def test_script_workflow(self): - expected_destination = 'apache_beam/transforms' - expansion_service_config = { - "gradle_target": 'sdks:java:io:expansion-service:shadowJar', - 'destinations': { - 'python': expected_destination - } - } - - self.create_and_validate_transforms_config( - expansion_service_config, 'GenerateSequence', expected_destination) - grouped_wrappers = self.write_wrappers_to_destinations_and_validate( - [expected_destination]) - # at least the GenerateSequence wrapper is set to this destination - self.assertGreaterEqual(len(grouped_wrappers[expected_destination]), 1) - - # check the wrapper exists in this destination and has correct properties - output_module = self.get_module(expected_destination) - self.assertTrue(hasattr(output_module, 'GenerateSequence')) - # Since our config isn't skipping any transforms, - # it should include these two Kafka IOs as well - self.assertTrue(hasattr(output_module, 'KafkaWrite')) - self.assertTrue(hasattr(output_module, 'KafkaRead')) - self.assertTrue( - isinstance(output_module.GenerateSequence(start=0), ExternalTransform)) - self.assertEqual( - output_module.GenerateSequence.identifier, self.GEN_SEQ_IDENTIFIER) - - self.delete_and_validate() - - def test_script_workflow_with_modified_transforms(self): - modified_name = 'ModifiedSequence' - modified_dest = 'apache_beam/io/gcp' - expansion_service_config = { - "gradle_target": 'sdks:java:io:expansion-service:shadowJar', - 'destinations': { - 'python': 'apache_beam/transforms' - }, - 'transforms': { - 'beam:schematransform:org.apache.beam:generate_sequence:v1': { - 'name': modified_name, - 'destinations': { - 'python': modified_dest - } - } - } - } - - self.create_and_validate_transforms_config( - expansion_service_config, modified_name, modified_dest) - - grouped_wrappers = self.write_wrappers_to_destinations_and_validate( - [modified_dest]) - self.assertIn(modified_name, grouped_wrappers[modified_dest][0]) - self.assertEqual(len(grouped_wrappers[modified_dest]), 1) - - # check the modified wrapper exists in the modified destination - # and check it has the correct properties - output_module = self.get_module(modified_dest) - self.assertTrue( - isinstance(output_module.ModifiedSequence(start=0), ExternalTransform)) - self.assertEqual( - output_module.ModifiedSequence.identifier, self.GEN_SEQ_IDENTIFIER) - - self.delete_and_validate() - - def test_script_workflow_with_skipped_transform(self): - expansion_service_config = { - "gradle_target": 'sdks:java:io:expansion-service:shadowJar', - 'destinations': { - 'python': f'apache_beam/transforms/{self.test_dir_name}' - }, - 'skip_transforms': [ - 'beam:schematransform:org.apache.beam:generate_sequence:v1' - ] - } - - with open(self.service_config_path, 'w') as f: - yaml.dump([expansion_service_config], f) - - self.script.generate_transforms_config( - self.service_config_path, self.transform_config_path) - - # gen sequence shouldn't exist in the transform config - with open(self.transform_config_path) as f: - transforms = yaml.safe_load(f) - gen_seq_config = None - for transform in transforms: - if transform['identifier'] == self.GEN_SEQ_IDENTIFIER: - gen_seq_config = transform - self.assertIsNone(gen_seq_config) - - def test_run_pipeline_with_generated_transform(self): - with beam.Pipeline() as p: - numbers = ( - p | GenerateSequence(start=0, end=10) - | beam.Map(lambda row: row.value)) - assert_that(numbers, equal_to([i for i in range(10)])) - - def test_check_standard_external_transforms_config_in_sync(self): - """ - This test creates a transforms config file and checks it against - `sdks/standard_external_transforms.yaml`. Fails if the two configs don't - match. - - Fix by running `./gradlew generateExternalTransformsConfig` and - committing the changes. - """ - sdks_dir = os.path.abspath(dirname(self.sdk_dir)) - self.script.generate_transforms_config( - os.path.join(sdks_dir, 'standard_expansion_services.yaml'), - self.transform_config_path) - with open(self.transform_config_path) as f: - test_config = yaml.safe_load(f) - with open(os.path.join(sdks_dir, 'standard_external_transforms.yaml'), - 'r') as f: - standard_config = yaml.safe_load(f) - - self.assertEqual( - test_config, - standard_config, - "The standard xlang transforms config file " - "\"standard_external_transforms.yaml\" is out of sync! Please update" - "by running './gradlew generateExternalTransformsConfig'" - "and committing the changes.") - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() diff --git a/sdks/python/apache_beam/transforms/external_transform_provider_test.py b/sdks/python/apache_beam/transforms/external_transform_provider_test.py new file mode 100644 index 0000000000000..36fe9b5c4bd62 --- /dev/null +++ b/sdks/python/apache_beam/transforms/external_transform_provider_test.py @@ -0,0 +1,140 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import os +import unittest + +import pytest + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external_transform_provider import STANDARD_URN_PATTERN +from apache_beam.transforms.external_transform_provider import ExternalTransformProvider +from apache_beam.transforms.external_transform_provider import camel_case_to_snake_case +from apache_beam.transforms.external_transform_provider import infer_name_from_identifier +from apache_beam.transforms.external_transform_provider import snake_case_to_lower_camel_case +from apache_beam.transforms.external_transform_provider import snake_case_to_upper_camel_case + + +class NameUtilsTest(unittest.TestCase): + def test_snake_case_to_upper_camel_case(self): + test_cases = [("", ""), ("test", "Test"), ("test_name", "TestName"), + ("test_double_underscore", "TestDoubleUnderscore"), + ("TEST_CAPITALIZED", "TestCapitalized"), + ("_prepended_underscore", "PrependedUnderscore"), + ("appended_underscore_", "AppendedUnderscore")] + for case in test_cases: + self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0])) + + def test_snake_case_to_lower_camel_case(self): + test_cases = [("", ""), ("test", "test"), ("test_name", "testName"), + ("test_double_underscore", "testDoubleUnderscore"), + ("TEST_CAPITALIZED", "testCapitalized"), + ("_prepended_underscore", "prependedUnderscore"), + ("appended_underscore_", "appendedUnderscore")] + for case in test_cases: + self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0])) + + def test_camel_case_to_snake_case(self): + test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"), + ("TestDoubleUnderscore", + "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"), + ("BEGINNINGAllCaps", + "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"), + ("AllCapsMIDDLEWord", "all_caps_middle_word"), + ("lowerCamelCase", "lower_camel_case")] + for case in test_cases: + self.assertEqual(case[1], camel_case_to_snake_case(case[0])) + + def test_infer_name_from_identifier(self): + standard_test_cases = [ + ("beam:schematransform:org.apache.beam:transform:v1", "Transform"), + ("beam:schematransform:org.apache.beam:my_transform:v1", + "MyTransform"), ( + "beam:schematransform:org.apache.beam:my_transform:v2", + "MyTransformV2"), + ("beam:schematransform:org.apache.beam:fe_fi_fo_fum:v2", "FeFiFoFumV2"), + ("beam:schematransform:bad_match:my_transform:v1", None) + ] + for case in standard_test_cases: + self.assertEqual( + case[1], infer_name_from_identifier(case[0], STANDARD_URN_PATTERN)) + + custom_pattern_cases = [ + # (, , ) + ( + r"^custom:transform:([\w-]+):(\w+)$", + "custom:transform:my_transform:v1", + "MyTransformV1"), + ( + r"^org.user:([\w-]+):([\w-]+):([\w-]+):external$", + "org.user:some:custom_transform:we_made:external", + "SomeCustomTransformWeMade"), + ( + r"^([\w-]+):user.transforms", + "my_eXTErnal:user.transforms", + "MyExternal"), + (r"^([\w-]+):user.transforms", "my_external:badinput.transforms", None), + ] + for case in custom_pattern_cases: + self.assertEqual(case[2], infer_name_from_identifier(case[1], case[0])) + + +@pytest.mark.uses_io_java_expansion_service +@unittest.skipUnless( + os.environ.get('EXPANSION_PORT'), + "EXPANSION_PORT environment var is not provided.") +class ExternalTransformProviderTest(unittest.TestCase): + def setUp(self): + self.test_pipeline = TestPipeline(is_integration_test=True) + + def test_generate_sequence_config_schema_and_description(self): + provider = ExternalTransformProvider( + BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) + + self.assertTrue(( + 'GenerateSequence', + 'beam:schematransform:org.apache.beam:generate_sequence:v1' + ) in provider.get_available()) + + GenerateSequence = provider.get('GenerateSequence') + config_schema = GenerateSequence.configuration_schema + for param in ['start', 'end', 'rate']: + self.assertTrue(param in config_schema) + + description_substring = ( + "Outputs a PCollection of Beam Rows, each " + "containing a single INT64") + self.assertTrue(description_substring in GenerateSequence.description) + + def test_run_generate_sequence(self): + provider = ExternalTransformProvider( + BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) + + with beam.Pipeline() as p: + numbers = p | provider.GenerateSequence( + start=0, end=10) | beam.Map(lambda row: row.value) + + assert_that(numbers, equal_to([i for i in range(10)])) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/transforms/xlang/__init__.py b/sdks/python/apache_beam/transforms/xlang/__init__.py deleted file mode 100644 index ba896615cbc4e..0000000000000 --- a/sdks/python/apache_beam/transforms/xlang/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -This package contains autogenerated Python wrappers for cross-language -transforms available in other Beam SDKs. - -For documentation on creating and using cross-language transforms, see: -https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms - -For more information on transform wrapper generation, see the top level script: -`gen_xlang_wrappers.py` -""" diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index 85b26ca3a464a..7f2bc7f5d4236 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -66,25 +66,6 @@ artifacts { distTarBall file: file("${buildDir}/${tarball}"), builtBy: sdist } -tasks.register("generateExternalTransformsConfig") { - description "Discovers external transforms and regenerates the config at sdks/standard_expansion_services.yaml" - - dependsOn buildPython - // Need to build all expansion services listed in sdks/standard_expansion_services.yaml - dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build" - dependsOn ":sdks:java:io:expansion-service:build" - // Keep this in-sync with pyproject.toml - def PyYaml = "'pyyaml>=3.12,<7.0.0'" - - doLast { - exec { - executable 'sh' - args '-c', "pip install $PyYaml && " + - "python gen_xlang_wrappers.py --cleanup --generate-config-only" - } - } -} - // Create Python wheels for given platform and Python version // build identifiers for cibuildwheel def platform_identifiers_map = [ diff --git a/sdks/python/gen_xlang_wrappers.py b/sdks/python/gen_xlang_wrappers.py deleted file mode 100644 index a75fc05cba733..0000000000000 --- a/sdks/python/gen_xlang_wrappers.py +++ /dev/null @@ -1,447 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Generates Python wrappers for external transforms (specifically, -SchemaTransforms) -""" - -import argparse -import datetime -import logging -import os -import shutil -import subprocess -import typing -from typing import Any -from typing import Dict -from typing import List -from typing import Union - -import yaml - -from gen_protos import LICENSE_HEADER -from gen_protos import PROJECT_ROOT -from gen_protos import PYTHON_SDK_ROOT - -SUPPORTED_SDK_DESTINATIONS = ['python'] -PYTHON_SUFFIX = "_et.py" -PY_WRAPPER_OUTPUT_DIR = os.path.join( - PYTHON_SDK_ROOT, 'apache_beam', 'transforms', 'xlang') - - -def generate_transforms_config(input_services, output_file): - """ - Generates a YAML file containing a list of transform configurations. - - Takes an input YAML file containing a list of expansion service gradle - targets. Each service must provide a `destinations` field that specifies the - default package (relative path) that generated wrappers should be imported - to. A default destination package is specified for each SDK, like so:: - - - gradle_target: 'sdks:java:io:expansion-service:shadowJar' - destinations: - python: 'apache_beam/io' - - We use :class:`ExternalTransformProvider` to discover external - transforms. Then, we extract the necessary details of each transform and - compile them into a new YAML file, which is later used to generate wrappers. - - Importing generated transforms to an existing package - ----------------------------------------------------- - When running the script on the config above, a new module will be created at - `apache_beam/transforms/xlang/io.py`. This contains all - generated wrappers that are set to destination 'apache_beam/io'. Finally, - to make these available to the `apache_beam.io` package (or any package - really), just add the following line to the package's `__init__.py` file:: - from apache_beam.transforms.xlang.io import * - - Modifying a transform's name and destination - -------------------------------------------- - Each service may also specify modifications for particular transform. - Currently, one can modify the generated wrapper's **name** and - **destination** package: - - By default, the transform's identifier is used to generate the wrapper - class name. This can be overriden by manually providing a name. - - By default, generated wrappers are made available to the package provided - by their respective expansion service. This can be overridden by - providing a relative path to a different package. - - See the following example for what such modifications can look like:: - - - gradle_target: 'sdks:java:io:expansion-service:shadowJar' - destinations: - python: 'apache_beam/io' - transforms: - 'beam:schematransform:org.apache.beam:my_transform:v1': - name: 'MyCustomTransformName' - destinations: - python: 'apache_beam/io/gcp' - - For the above example, we would take the transform with identifier - `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer - a wrapper class name of `MyTransform` then write it to the module - `apache_beam/transforms/xlang/io.py`. With these modifications - however, we instead use the provided name `MyCustomTransformName` and write - it to `apache_beam/transforms/xlang/io_gcp.py`. - Similar to above, this can be made available by importing it in the - `__init__.py` file like so:: - from apache_beam.transforms.xlang.io_gcp import * - - Skipping transforms - ------------------- - To skip a particular transform, simply list its identifier in the - `skip_transforms` field, like so:: - - - gradle_target: 'sdks:java:io:expansion-service:shadowJar' - destinations: - python: 'apache_beam/io' - skip_transforms: - - 'beam:schematransform:org.apache.beam:some_transform:v1' - """ - from apache_beam.transforms.external import BeamJarExpansionService - from apache_beam.transforms.external_transform_provider import ExternalTransform - from apache_beam.transforms.external_transform_provider import ExternalTransformProvider - - transform_list: List[Dict[str, Any]] = [] - - with open(input_services) as f: - services = yaml.safe_load(f) - for service in services: - target = service['gradle_target'] - - if "destinations" not in service: - raise ValueError( - f"Expansion service with target '{target}' does not " - "specify any default destinations.") - service_destinations: Dict[str, str] = service['destinations'] - for sdk, dest in service_destinations.items(): - validate_sdks_destinations(sdk, dest, target) - - transforms_to_skip = service.get('skip_transforms', []) - - # use dynamic provider to discover and populate wrapper details - provider = ExternalTransformProvider(BeamJarExpansionService(target)) - discovered: Dict[str, ExternalTransform] = provider.get_all() - for identifier, wrapper in discovered.items(): - if identifier in transforms_to_skip: - continue - - transform_destinations = service_destinations.copy() - - # apply any modifications - modified_transform = {} - if 'transforms' in service and identifier in service['transforms']: - modified_transform = service['transforms'][identifier] - for sdk, dest in modified_transform.get('destinations', {}).items(): - validate_sdks_destinations(sdk, dest, target, identifier) - transform_destinations[sdk] = dest # override the destination - name = modified_transform.get('name', wrapper.__name__) - - fields = {} - for param in wrapper.configuration_schema.values(): - (tp, nullable) = pretty_type(param.type) - field_info = { - 'type': str(tp), - 'description': param.description, - 'nullable': nullable - } - fields[param.original_name] = field_info - - transform = { - 'identifier': identifier, - 'name': name, - 'destinations': transform_destinations, - 'default_service': target, - 'fields': fields, - 'description': wrapper.description - } - transform_list.append(transform) - - with open(output_file, 'w') as f: - f.write(LICENSE_HEADER.lstrip()) - f.write( - "# NOTE: This file is autogenerated and should " - "not be edited by hand.\n") - f.write( - "# Configs are generated based on the expansion service\n" - f"# configuration in {input_services.replace(PROJECT_ROOT, '')}.\n") - f.write("# Refer to gen_xlang_wrappers.py for more info.\n") - dt = datetime.datetime.now().date() - f.write(f"#\n# Last updated on: {dt}\n\n") - yaml.dump(transform_list, f) - logging.info("Successfully wrote transform configs to file: %s", output_file) - - -def validate_sdks_destinations(sdk, dest, service, identifier=None): - if identifier: - message = f"Identifier '{identifier}'" - else: - message = f"Service '{service}'" - if sdk not in SUPPORTED_SDK_DESTINATIONS: - raise ValueError( - message + " specifies a destination for an invalid SDK:" - f" '{sdk}'. The supported SDKs are {SUPPORTED_SDK_DESTINATIONS}") - if not os.path.isdir(os.path.join(PYTHON_SDK_ROOT, *dest.split('/'))): - raise ValueError( - message + f" specifies an invalid destination '{dest}'." - " Please make sure the destination is an existing directory.") - - -def pretty_type(tp): - """ - Takes a type and returns a tuple containing a pretty string representing it - and a bool signifying if it is nullable or not. - - For optional types, the contained type is unwrapped and returned. This does - not recurse however, so inner Optional types are not affected. - E.g. the input typing.Optional[typing.Dict[int, typing.Optional[str]]] will - return (Dict[int, Union[str, NoneType]], True) - """ - nullable = False - if (typing.get_origin(tp) is Union and type(None) in typing.get_args(tp)): - nullable = True - # only unwrap if it's a single nullable type. if the type is truly a union - # of multiple types, leave it alone. - args = typing.get_args(tp) - if len(args) == 2: - tp = list(filter(lambda t: not isinstance(t, type(None)), args))[0] - - # TODO(ahmedabu98): Make this more generic to support other remote SDKs - # Potentially use Runner API types - if tp.__module__ == 'builtins': - tp = tp.__name__ - elif tp.__module__ == 'typing': - tp = str(tp).replace("typing.", "") - elif tp.__module__ == 'numpy': - tp = "%s.%s" % (tp.__module__, tp.__name__) - - return (tp, nullable) - - -def camel_case_to_snake_case(string): - """Convert camelCase to snake_case""" - arr = [] - word = [] - for i, n in enumerate(string): - # If seeing an upper letter after a lower letter, we just witnessed a word - # If seeing an upper letter and the next letter is lower, we may have just - # witnessed an all caps word - if n.isupper() and ((i > 0 and string[i - 1].islower()) or - (i + 1 < len(string) and string[i + 1].islower())): - arr.append(''.join(word)) - word = [n.lower()] - else: - word.append(n.lower()) - arr.append(''.join(word)) - return '_'.join(arr).strip('_') - - -def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]: - """ - Generates code for external transform wrapper classes (subclasses of - :class:`ExternalTransform`). - - Takes a YAML file containing a list of SchemaTransform configurations. For - each configuration, the code for a wrapper class is generated, along with any - documentation that may be included. - - Each configuration must include a destination file that the generated class - will be written to. - - Returns the generated classes, grouped by destination. - """ - from jinja2 import Environment - from jinja2 import FileSystemLoader - - env = Environment(loader=FileSystemLoader(PYTHON_SDK_ROOT)) - python_wrapper_template = env.get_template("python_xlang_wrapper.template") - - # maintain a list of wrappers to write in each file. if modified destinations - # are used, we may end up with multiple wrappers in one file. - destinations: Dict[str, List[str]] = {} - - with open(config_file) as f: - transforms = yaml.safe_load(f) - for config in transforms: - default_service = config['default_service'] - description = config['description'] - destination = config['destinations']['python'] - name = config['name'] - fields = config['fields'] - identifier = config['identifier'] - - parameters = [] - for param, info in fields.items(): - pythonic_name = camel_case_to_snake_case(param) - param_details = { - "name": pythonic_name, - "type": info['type'], - "description": info['description'], - } - - if info['nullable']: - param_details["default"] = None - parameters.append(param_details) - - # Python syntax requires function definitions to have - # non-default parameters first - parameters = sorted(parameters, key=lambda p: 'default' in p) - default_service = f"BeamJarExpansionService(\"{default_service}\")" - - python_wrapper_class = python_wrapper_template.render( - class_name=name, - identifier=identifier, - parameters=parameters, - description=description, - default_expansion_service=default_service) - - if destination not in destinations: - destinations[destination] = [] - destinations[destination].append(python_wrapper_class) - - return destinations - - -def write_wrappers_to_destinations( - grouped_wrappers: Dict[str, List[str]], - output_dir=PY_WRAPPER_OUTPUT_DIR, - format_code=True): - """ - Takes a dictionary of generated wrapper code, grouped by destination. - For each destination, create a new file containing the respective wrapper - classes. Each file includes the Apache License header and relevant imports. - Note: the Jinja template should already follow linting and formatting rules. - """ - written_files = [] - for dest, wrappers in grouped_wrappers.items(): - module_name = dest.replace('apache_beam/', '').replace('/', '_') - module_path = os.path.join(output_dir, module_name) + ".py" - with open(module_path, "w") as file: - file.write(LICENSE_HEADER.lstrip()) - file.write( - "\n# NOTE: This file contains autogenerated external transform(s)\n" - "# and should not be edited by hand.\n" - "# Refer to gen_xlang_wrappers.py for more info.\n\n") - file.write( - "\"\"\"" - "Cross-language transforms in this module can be imported from the\n" - f":py:mod:`{dest.replace('/', '.')}` package." - "\"\"\"\n\n") - file.write( - "# pylint:disable=line-too-long\n\n" - "from apache_beam.transforms.external import " - "BeamJarExpansionService\n" - "from apache_beam.transforms.external_transform_provider " - "import ExternalTransform\n") - for wrapper in wrappers: - file.write(wrapper + "\n") - written_files.append(module_path) - - logging.info("Created external transform wrapper modules: %s", written_files) - - if format_code: - formatting_cmd = ['yapf', '--in-place', *written_files] - subprocess.run(formatting_cmd, capture_output=True, check=True) - - -def delete_generated_files(root_dir): - """Scans for and deletes generated wrapper files.""" - logging.info("Deleting external transform wrappers from dir %s", root_dir) - deleted_files = os.listdir(root_dir) - for file in deleted_files: - if file == '__init__.py': - deleted_files.remove(file) - continue - path = os.path.join(root_dir, file) - if os.path.isfile(path) or os.path.islink(path): - os.unlink(os.path.join(root_dir, file)) - else: - shutil.rmtree(path) - logging.info("Successfully deleted files: %s", deleted_files) - - -def run_script( - cleanup, - generate_config_only, - input_expansion_services, - transforms_config_source): - # Cleanup first if requested. This is needed to remove outdated wrappers. - if cleanup: - delete_generated_files(PY_WRAPPER_OUTPUT_DIR) - - # This step requires the expansion service. - # Only generate a transforms config file if none are provided - if not transforms_config_source: - output_transforms_config = os.path.join( - PROJECT_ROOT, 'sdks', 'standard_external_transforms.yaml') - generate_transforms_config( - input_services=input_expansion_services, - output_file=output_transforms_config) - - transforms_config_source = output_transforms_config - else: - if not os.path.exists(transforms_config_source): - raise RuntimeError( - "Could not find the provided transforms config " - f"source: {transforms_config_source}") - - if generate_config_only: - return - - wrappers_grouped_by_destination = get_wrappers_from_transform_configs( - transforms_config_source) - - write_wrappers_to_destinations(wrappers_grouped_by_destination) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument( - '--cleanup', - dest='cleanup', - action='store_true', - help="Whether to cleanup existing generated wrappers first.") - parser.add_argument( - '--generate-config-only', - dest='generate_config_only', - action='store_true', - help="If set, will generate the transform config only without generating" - "any wrappers.") - parser.add_argument( - '--input-expansion-services', - dest='input_expansion_services', - default=os.path.join( - PROJECT_ROOT, 'sdks', 'standard_expansion_services.yaml'), - help=( - "Absolute path to the input YAML file that contains " - "expansion service configs. Ignored if a transforms config" - "source is provided.")) - parser.add_argument( - '--transforms-config-source', - dest='transforms_config_source', - help=( - "Absolute path to a source transforms config YAML file to " - "generate wrapper modules from. If not provided, one will be " - "created by this script.")) - args = parser.parse_args() - - run_script( - args.cleanup, - args.generate_config_only, - args.input_expansion_services, - args.transforms_config_source) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 9829671ccb72e..f1a65c842d531 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -29,12 +29,6 @@ requires = [ "numpy>=1.14.3,<1.27", # Update setup.py as well. # having cython here will create wheels that are platform dependent. "cython==0.29.36", - ## deps for generating external transform wrappers: - # also update PyYaml bounds in sdks:python:generateExternalTransformsConfig - 'pyyaml>=3.12,<7.0.0', - # also update Jinja2 bounds in test-suites/xlang/build.gradle (look for xlangWrapperValidation task) - "jinja2>=2.7.1,<4.0.0", - 'yapf==0.29.0' ] diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index e78697169bb03..c95aa5974da71 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -34,7 +34,6 @@ markers = uses_java_expansion_service: collect Cross Language Java transforms test runs uses_python_expansion_service: collect Cross Language Python transforms test runs uses_io_java_expansion_service: collect Cross Language IO Java transform test runs (with Kafka bootstrap server) - xlang_wrapper_generation: collect tests that validate Cross Language wrapper generation uses_transform_service: collect Cross Language test runs that uses the Transform Service xlang_sql_expansion_service: collect for Cross Language with SQL expansion service test runs it_postcommit: collect for post-commit integration test runs diff --git a/sdks/python/python_xlang_wrapper.template b/sdks/python/python_xlang_wrapper.template deleted file mode 100644 index f3d3728aaceb7..0000000000000 --- a/sdks/python/python_xlang_wrapper.template +++ /dev/null @@ -1,36 +0,0 @@ -{# - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -#} - -class {{ class_name }}(ExternalTransform): -{% if description %} """ - {{ description | wordwrap(78) | replace('\n', '\n ') }} - """{% endif %} - identifier = "{{ identifier }}" - - def __init__( - self,{% if parameters %}{% for param in parameters%} - {{ param.name }}{% if 'default' in param %}={{ param.default }}{% endif %},{% endfor %}{% endif %} - expansion_service=None): - {% if parameters %}"""{% for param in parameters %} - :param {{ param.name }}: ({{ param.type }}){% if param.description %} - {{ param.description | wordwrap(72) | replace('\n', '\n ') }} {% endif %}{% endfor %} - """{% endif %} - self.default_expansion_service = {{ default_expansion_service }} - super().__init__( - {% for param in parameters %}{{ param.name }}={{ param.name }}, - {% endfor %}expansion_service=expansion_service) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 409951cbc4151..a5a14c035dc2f 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -16,8 +16,8 @@ # """Apache Beam SDK for Python setup file.""" + import glob -import logging import os import shutil import subprocess @@ -227,50 +227,6 @@ def copy_tests_from_docs(): f'Could not locate yaml docs in {docs_src} or {docs_dest}.') -def generate_external_transform_wrappers(): - try: - sdk_dir = os.path.abspath(os.path.dirname(__file__)) - script_exists = os.path.exists( - os.path.join(sdk_dir, 'gen_xlang_wrappers.py')) - config_exists = os.path.exists( - os.path.join(os.path.dirname(sdk_dir), - 'standard_external_transforms.yaml')) - # we need both the script and the standard transforms config file. - # at build time, we don't have access to apache_beam to discover and - # retrieve external transforms, so the config file has to already exist - if not script_exists or not config_exists: - generated_transforms_dir = os.path.join( - sdk_dir, 'apache_beam', 'transforms', 'xlang') - - # if exists, this directory will have at least its __init__.py file - if (not os.path.exists(generated_transforms_dir) or - len(os.listdir(generated_transforms_dir)) <= 1): - message = 'External transform wrappers have not been generated ' - if not script_exists: - message += 'and the generation script `gen_xlang_wrappers.py`' - if not config_exists: - message += 'and the standard external transforms config' - message += ' could not be found' - raise RuntimeError(message) - else: - logging.info( - 'Skipping external transform wrapper generation as they ' - 'are already generated.') - return - subprocess.run([ - sys.executable, - os.path.join(sdk_dir, 'gen_xlang_wrappers.py'), - '--cleanup', - '--transforms-config-source', - os.path.join(os.path.dirname(sdk_dir), - 'standard_external_transforms.yaml') - ], capture_output=True, check=True) - except subprocess.CalledProcessError as err: - raise RuntimeError( - 'Could not generate external transform wrappers due to ' - 'error: %s', err.stderr) - - def get_portability_package_data(): files = [] portability_dir = Path(__file__).parent / 'apache_beam' / \ @@ -297,8 +253,6 @@ def get_portability_package_data(): # executes below. generate_protos_first() - generate_external_transform_wrappers() - # These data files live elsewhere in the full Beam repository. copy_tests_from_docs() @@ -431,6 +385,7 @@ def get_portability_package_data(): 'testcontainers[mysql]>=3.0.3,<4.0.0', 'cryptography>=41.0.2', 'hypothesis>5.0.0,<=7.0.0', + 'pyyaml>=3.12,<7.0.0', ], 'gcp': [ 'cachetools>=3.1.0,<6', diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index cadf3a6ae2c6c..d29d85af30111 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -536,8 +536,10 @@ def dataflowRegion = project.findProperty('dataflowRegion') ?: 'us-central1' project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> createCrossLanguageUsingJavaExpansionTask( name: taskMetadata.name, - expansionProjectPaths: taskMetadata.expansionProjectPaths, + expansionProjectPath: taskMetadata.expansionProjectPath, collectMarker: taskMetadata.collectMarker, + startJobServer: taskMetadata.startJobServer, + cleanupJobServer: taskMetadata.cleanupJobServer, pythonPipelineOptions: [ "--runner=TestDataflowRunner", "--project=${dataflowProject}", @@ -546,7 +548,6 @@ project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> "--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest" ], pytestOptions: basicPytestOpts, - additionalDeps: taskMetadata.additionalDeps, additionalEnvs: taskMetadata.additionalEnvs ) } diff --git a/sdks/python/test-suites/direct/build.gradle b/sdks/python/test-suites/direct/build.gradle index 4b10253439858..ea643c3303aa1 100644 --- a/sdks/python/test-suites/direct/build.gradle +++ b/sdks/python/test-suites/direct/build.gradle @@ -42,10 +42,3 @@ task ioCrossLanguagePostCommit { dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(it)}:ioCrossLanguagePythonUsingJava") } } - -task crossLanguageWrapperValidationPreCommit { - // Different python versions may output types that look different and lead to - // false failures. To be consistent, we test on the lowest version only - def lowestSupportedVersion = getVersionsAsList('cross_language_validates_py_versions')[0] - dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(lowestSupportedVersion)}:xlangWrapperValidationPythonUsingJava") -} diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle index b5680c2e1e9a8..657f7adf801d8 100644 --- a/sdks/python/test-suites/direct/common.gradle +++ b/sdks/python/test-suites/direct/common.gradle @@ -412,8 +412,10 @@ def gcpProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> createCrossLanguageUsingJavaExpansionTask( name: taskMetadata.name, - expansionProjectPaths: taskMetadata.expansionProjectPaths, + expansionProjectPath: taskMetadata.expansionProjectPath, collectMarker: taskMetadata.collectMarker, + startJobServer: taskMetadata.startJobServer, + cleanupJobServer: taskMetadata.cleanupJobServer, numParallelTests: 1, pythonPipelineOptions: [ "--runner=TestDirectRunner", @@ -424,8 +426,6 @@ project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> "--timeout=4500", // timeout of whole command execution "--color=yes", // console color "--log-cli-level=INFO" //log level info - ], - additionalDeps: taskMetadata.additionalDeps, - additionalEnvs: taskMetadata.additionalEnvs + ] ) } diff --git a/sdks/python/test-suites/xlang/build.gradle b/sdks/python/test-suites/xlang/build.gradle index 3065ad8377e31..5a124ac20ce2a 100644 --- a/sdks/python/test-suites/xlang/build.gradle +++ b/sdks/python/test-suites/xlang/build.gradle @@ -16,43 +16,57 @@ * limitations under the License. */ // This is a base file to set up cross language tests for different runners -import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTask +import org.apache.beam.gradle.BeamModulePlugin +import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon project.evaluationDependsOn(":sdks:python") +// Set up cross language tests +def envDir = project.project(":sdks:python").envdir +def jobPort = BeamModulePlugin.getRandomPort() +def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp" +def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid" + +def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) { + dependsOn ':sdks:python:installGcpTest' + + executable 'sh' + args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log" +} + +def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) { + executable 'sh' + args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop" +} + +// List of objects representing task metadata to create cross-language tasks from. +// Each object contains the minimum relevant metadata. +def xlangTasks = [] + // ******** Java GCP expansion service ******** // Note: this only runs cross-language tests that use the Java GCP expansion service -// To run tests that use another expansion service, create a new CrossLanguageTask with the +// To run tests that use another expansion service, create a new CrossLanguageTaskCommon with the // relevant fields as done here, then add it to `xlangTasks`. -def gcpExpansionPath = project.project(':sdks:java:io:google-cloud-platform:expansion-service').getPath() -def ioExpansionPath = project.project(':sdks:java:io:expansion-service').getPath() +def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service') // Properties that are common across runners. // Used to launch the expansion service, collect the right tests, and cleanup afterwards -def gcpXlang = new CrossLanguageTask().tap { +def gcpXlangCommon = new CrossLanguageTaskCommon().tap { name = "gcpCrossLanguage" - expansionProjectPaths = [gcpExpansionPath] + expansionProjectPath = gcpExpansionProject.getPath() collectMarker = "uses_gcp_java_expansion_service" + startJobServer = setupTask + cleanupJobServer = cleanupTask } -def ioXlang = new CrossLanguageTask().tap { +def ioXlangCommon = new CrossLanguageTaskCommon().tap { name = "ioCrossLanguage" - expansionProjectPaths = [ioExpansionPath] + expansionProjectPath = project.project(':sdks:java:io:expansion-service').getPath() collectMarker = "uses_io_java_expansion_service" + startJobServer = setupTask + cleanupJobServer = cleanupTask //See .test-infra/kafka/bitnami/README.md for setup instructions additionalEnvs = ["KAFKA_BOOTSTRAP_SERVER":project.findProperty('kafkaBootstrapServer')] } -// This list should include all expansion service targets in sdks/python/standard_expansion_services.yaml -def servicesToGenerateFrom = [ioExpansionPath, gcpExpansionPath] -def xlangWrapperValidation = new CrossLanguageTask().tap { - name = "xlangWrapperValidation" - expansionProjectPaths = servicesToGenerateFrom - collectMarker = "xlang_wrapper_generation" - // update Jinja2 bounds in pyproject.toml as well - additionalDeps = ['\"Jinja2>=2.7.1,<4.0.0\"'] -} - -// List of task metadata objects to create cross-language tasks from. -// Each object contains the minimum relevant metadata. -def xlangTasks = [gcpXlang, ioXlang, xlangWrapperValidation] +xlangTasks.addAll(gcpXlangCommon, ioXlangCommon) ext.xlangTasks = xlangTasks \ No newline at end of file diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 917eb0bd522cb..275bad26472b9 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -220,7 +220,7 @@ commands = # --azure_managed_identity_client_id "abc123" [testenv:py3-yapf] -# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml and pyproject.toml +# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml deps = yapf==0.29.0 commands = @@ -228,7 +228,7 @@ commands = time yapf --in-place --parallel --recursive apache_beam [testenv:py3-yapf-check] -# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml and pyproject.toml +# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml deps = yapf==0.29.0 commands = diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml deleted file mode 100644 index e9e6871be82f5..0000000000000 --- a/sdks/standard_expansion_services.yaml +++ /dev/null @@ -1,77 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This file enumerates the standard Apache Beam expansion services. -# Each service must specify a package destination for each supported SDK, which -# is where generated wrappers will go by default. -# -# Individual transforms can modify their destination module as well as their -# generated wrapper class name. -# -# Transform identifiers listed in the `skip_transforms` field will be skipped. -# -# Any new gradle targets added here should also be added to: -# - sdks/python/build.gradle (as a dependency in the 'generateExternalTransformsConfig' task) -# - sdks/python/test-suites/xlang/build.gradle (look for 'servicesToGenerateFrom') -# -# Refer to sdks/python/gen_xlang_wrappers.py for more info. - -- gradle_target: 'sdks:java:io:expansion-service:shadowJar' - destinations: - python: 'apache_beam/io' - transforms: - 'beam:schematransform:org.apache.beam:kafka_write:v1': - name: 'WriteToKafka' - 'beam:schematransform:org.apache.beam:kafka_read:v1': - name: 'ReadFromKafka' - skip_transforms: - # Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py - - 'beam:schematransform:org.apache.beam:kafka_write:v1' - - 'beam:schematransform:org.apache.beam:kafka_read:v1' - -# TODO(ahmedabu98): Enable this service in a future PR -#- gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' -# destinations: -# python: 'apache_beam/io/gcp' -# transforms: -# 'beam:schematransform:org.apache.beam:spanner_cdc_read:v1': -# name: 'ReadFromSpannerChangeStreams' -# skip_transforms: -# # generate_sequence is already included in the Java IO expansion service -# - 'beam:schematransform:org.apache.beam:generate_sequence:v1' -# # Handwritten wrappers exist in apache_beam/io/gcp/pubsublite/ -# - 'beam:schematransform:org.apache.beam:pubsublite_read:v1' -# - 'beam:schematransform:org.apache.beam:pubsublite_write:v1' -# # Handwritten wrapper exists in apache_beam/io/gcp/spanner.py -# - 'beam:schematransform:org.apache.beam:spanner_write:v1' -# # Native IO exists in apache_beam/io/gcp/pubsub.py -# - 'beam:schematransform:org.apache.beam:pubsub_read:v1' -# - 'beam:schematransform:org.apache.beam:pubsub_write:v1' -# # Native IO exists in apache_beam/io/gcp/bigquery.py -# - 'beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1' -# - 'beam:schematransform:org.apache.beam:bigquery_export_read:v1' -# - 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2' -# - 'beam:schematransform:org.apache.beam:bigquery_storage_read:v1' -## - 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2' -# # Handwritten wrappers exists in apache_beam/io/jdbc.py -# - 'beam:schematransform:org.apache.beam:jdbc_write:v1' -# - 'beam:schematransform:org.apache.beam:jdbc_read:v1' -# # Handwritten wrappers exist in apache_beam/io/gcp/bigtableio.py -# - 'beam:schematransform:org.apache.beam:bigtable_write:v1' -# - 'beam:schematransform:org.apache.beam:bigtable_read:v1' diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml deleted file mode 100644 index b43e93ab4919d..0000000000000 --- a/sdks/standard_external_transforms.yaml +++ /dev/null @@ -1,52 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# NOTE: This file is autogenerated and should not be edited by hand. -# Configs are generated based on the expansion service -# configuration in /sdks/standard_expansion_services.yaml. -# Refer to gen_xlang_wrappers.py for more info. -# -# Last updated on: 2024-02-22 - -- default_service: sdks:java:io:expansion-service:shadowJar - description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 - number called "value". The count is produced from the given "start" value and - either up to the given "end" or until 2^63 - 1. - - To produce an unbounded PCollection, simply do not specify an "end" value. Unbounded - sequences can specify a "rate" for output elements. - - In all cases, the sequence of numbers is generated in parallel, so there is no - inherent ordering between the generated values' - destinations: - python: apache_beam/io - fields: - end: - description: The maximum number to generate (exclusive). Will be an unbounded - sequence if left unspecified. - nullable: true - type: numpy.int64 - rate: - description: Specifies the rate to generate a given number of elements per a - given number of seconds. Applicable only to unbounded sequences. - nullable: true - type: Row(seconds=typing.Union[numpy.int64, NoneType], elements=) - start: - description: The minimum number to generate (inclusive). - nullable: false - type: numpy.int64 - identifier: beam:schematransform:org.apache.beam:generate_sequence:v1 - name: GenerateSequence