From 4167be2344e8624f14a942744c823317007ec727 Mon Sep 17 00:00:00 2001 From: Andrey Devyatkin Date: Fri, 23 Jun 2023 17:38:11 +0200 Subject: [PATCH] added beam_PreCommit_Java_Examples_Dataflow job added pull_request_target event added required steps improved if conditions Bump github.com/aws/aws-sdk-go-v2/feature/s3/manager in /sdks (#27161) Bumps [github.com/aws/aws-sdk-go-v2/feature/s3/manager](https://github.com/aws/aws-sdk-go-v2) from 1.11.67 to 1.11.70. - [Release notes](https://github.com/aws/aws-sdk-go-v2/releases) - [Changelog](https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md) - [Commits](https://github.com/aws/aws-sdk-go-v2/compare/feature/s3/manager/v1.11.67...feature/s3/manager/v1.11.70) --- updated-dependencies: - dependency-name: github.com/aws/aws-sdk-go-v2/feature/s3/manager dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Using capabilities instead of the container name to set use_single_core_per_container Add Exception metadata to the data sampling protocol Add https://github.com/apache/beam/issues/27000 as a known issue released in Beam 2.48.0 (#27235) Python data sampling optimization (#27157) * Python optimization work * Exception Sampling perf tests * add better element sampling microbenchmark * slowly move towards plumbing the samplers to the bundle processors * cleaned up * starting clean up and more testing * finish tests * fix unused data_sampler args and comments * yapf, comments, and simplifications * linter * lint and mypy * linter * run tests * address review comments * run tests --------- Co-authored-by: Sam Rohde Improve autoscaler throughput estimates and account for heartbeats (#27056) The throughput estimates are improved by storing the estimates per partition (in StreamProgress) rather than per dofn instance. This required some refactoring of ThroughputEstimator/SizeEstimator. It now estimates throughput (per second) of the last ReadChangeStream request. fix(typescript): Publish types correctly, don't publish tests fix(typescript): added more exports Basic error handling for yaml. (#27145) Updates Python ExternalTransform to use the transform service when needed (#27228) * Updates Python ExternalTransform to use the transform service when needed * Addressing reviewer comments * Fix yapf * Fix lint * Fix yapf Add blog post for Managing Beam dependencies in Java Vertex AI Remote Model Handler (#27091) * Move WIP code to new branch at HEAD * Add client-side throttling * Various linting issues * More linting, add dependency to setup.py * Fix Docstring * new flags, types, framework for eventual file read * Route to correct tutorial * Better flag name * Align type hints, pylint cleanup * Move off of constant, preprocess image * Change to keyed version for output * Yapf * Pylint * Clean up ToDo * Add TODOs for file globs * Fix cast, remove unneccessary assert * whitespace * More whitespace * Import order * More import ordering * Amend comment [Playground] [Frontend] Brand colors in Playground flutter_code_editor (#27218) * brand colors in PG FCE * link to website * restart ci * restart ci --------- Co-authored-by: darkhan.nausharipov Bump go.mongodb.org/mongo-driver from 1.11.7 to 1.12.0 in /sdks (#27215) Bumps [go.mongodb.org/mongo-driver](https://github.com/mongodb/mongo-go-driver) from 1.11.7 to 1.12.0. - [Release notes](https://github.com/mongodb/mongo-go-driver/releases) - [Commits](https://github.com/mongodb/mongo-go-driver/compare/v1.11.7...v1.12.0) --- updated-dependencies: - dependency-name: go.mongodb.org/mongo-driver dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> update readme with Beam Playground, Tour of Beam (#27243) Co-authored-by: xqhu runner image and arc changes (#27118) * runner image and arc changes * add state bucket comment in env file for reference * add readme for manual image push and sync beam environment --------- Co-authored-by: vdjerek Fix whitespace (#27247) [Tour of Beam] Learning content for "Final challenge" module (#26861) * add final challenge * correct examples * changge * correct golang * correct example hint * correct examples and add golang example * delete whitespace * delete whitespace * add file tag * correct final challenge * fixing incorrect tags and names * minor formatting * fixing example formatting * correct challenge * correct whitespace * change final-challenge-2 * correct imports * fix splittable unit id * formatting * format change * formatting * fixing template * temp fix of url * backend urls fix * restore backend urls * change * remove window --------- Co-authored-by: mende1esmende1es Co-authored-by: Oleh Borysevych Bump transformers (#27108) Bumps [transformers](https://github.com/huggingface/transformers) from 4.21.1 to 4.30.0. - [Release notes](https://github.com/huggingface/transformers/releases) - [Commits](https://github.com/huggingface/transformers/compare/v4.21.1...v4.30.0) --- updated-dependencies: - dependency-name: transformers dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Code completion plugin: add plugin tests workflow (#27212) * Code completion plugin: add plugin tests workflow * Testing changing directory in code_completion_plugin_tests.yml * Making gradlew executable * Fix gradlew intellij plugin import * change gradlew premissions * code-completion-plugin: fixing GA tests workflow * Revert "change gradlew premissions" This reverts commit ee396be64ec3bccb883d687fc0c201c6c0d35562. * Revert "code-completion-plugin: fixing GA tests workflow" This reverts commit 9baa351a003d30980ff0bcbe582a80c5853e0fd0. * fix code_completion_plugin_tests.yml * Making gradlew executable * code-completion-plugin-fix intellij plugin version in build.gradle.kts * code-completion-plugin: change java version (17 -> 11) in code_completion_plugin_tests.yml * test code completion plugin CI * Add env vars to code_completion_plugin_test.yml * code-completion-plugin: test ci * code-completion-plugin: clone intellij-community to CI runner --------- Co-authored-by: Pablo Co-authored-by: Pablo E add job rerun action (#27210) * add job rerun action * Update .github/actions/rerun-job-action/action.yml Typo Co-authored-by: Danny McCormick * header and a comment explaining the logic for going into re-run flow * deduplicate API calls * added some explanation within the action file * Update .github/actions/rerun-job-action/action.yml Typo Co-authored-by: Danny McCormick --------- Co-authored-by: vdjerek Co-authored-by: Danny McCormick Code completion plugin: Add Java SDK Transform Completions (Hard Coded) (#27168) * [Code Completion Plugin] Define Element Pattern for java sdk * Code Completion Plugin: Add java sdk transform completions * Verifying that tests are running as expected :) Sorry! * Update BeamCompletionContributorTestCase.java --------- Co-authored-by: Pablo [Website] add hsbc case study (#27200) [Tour of Beam] add work example (#27080) * add work example * correct * correct tags for bigquery examples * correct read-query * correct read-query tag * correct imports * remove package * correct * fixed example name --------- Co-authored-by: mende1esmende1es Co-authored-by: Oleh Borysevych Remove py37 references (#27252) Handling issue where keystore file is empty for MongoDB SSL (#27250) Bump worker image dependency (#27253) [#22737] Add line about timer support for Go (#27263) Bump actions/checkout from 2 to 3 (#27259) Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v2...v3) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Add Tour of Beam Page (#27244) * add tour of beam page to redirect. * add tour of beam page to redirect. * add tour of beam page to redirect. --- .../beam_PreCommit_Java_Examples_Dataflow.yml | 97 +++++++ .../runners/worker/bundle_processor_test.py | 1 + .../runners/worker/data_sampler_test.py | 249 ++++++++++++++++++ 3 files changed, 347 insertions(+) create mode 100644 .github/workflows/beam_PreCommit_Java_Examples_Dataflow.yml diff --git a/.github/workflows/beam_PreCommit_Java_Examples_Dataflow.yml b/.github/workflows/beam_PreCommit_Java_Examples_Dataflow.yml new file mode 100644 index 0000000000000..6bbed5ed32b45 --- /dev/null +++ b/.github/workflows/beam_PreCommit_Java_Examples_Dataflow.yml @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: PreCommit Java Examples Dataflow + +on: + push: + tags: ['v*'] + branches: ['master', 'release-*'] + paths: ['model/**', 'sdks/java/**', 'runners/google-cloud-dataflow-java/**', 'examples/java/**', 'examples/kotlin/**', 'release/**'] + pull_request_target: + branches: ['master', 'release-*'] + paths: ['model/**', 'sdks/java/**', 'runners/google-cloud-dataflow-java/**', 'examples/java/**', 'examples/kotlin/**', 'release/**'] + issue_comment: + types: [created] + schedule: + - cron: '* */6 * * *' + +permissions: read-all +jobs: + check_gcp_variables: + timeout-minutes: 5 + name: check_gcp_variables + runs-on: [self-hosted, ubuntu-20.04] + outputs: + gcp-variables-set: ${{ steps.check_gcp_variables.outputs.gcp-variables-set }} + steps: + - name: Check out repository code + uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha }} + - name: Check are GCP variables set + run: "./scripts/ci/ci_check_are_gcp_variables_set.sh" + id: check_gcp_variables + env: + GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} + GCP_SA_EMAIL: ${{ secrets.GCP_SA_EMAIL }} + GCP_SA_KEY: ${{ secrets.GCP_SA_KEY }} + GCP_TESTING_BUCKET: ${{ secrets.GCP_TESTING_BUCKET }} + GCP_REGION: "not-needed-here" + GCP_PYTHON_WHEELS_BUCKET: "not-needed-here" + + beam_PreCommit_Java_Examples_Dataflow: + name: beam_PreCommit_Java_Examples_Dataflow + needs: + - check_gcp_variables + runs-on: [self-hosted, ubuntu-20.04] + if: | + github.event_name == 'push' || + github.event_name == 'pull_request_target' || + github.event_name == 'schedule' || + github.event.comment.body == 'Run Java_Examples_Dataflow PreCommit' + steps: + - name: Check out repository code + uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha }} + - name: Setup self-hosted + uses: ./.github/actions/setup-self-hosted-action + with: + requires-py-38: false + requires-py-39: false + requires-go: false + - name: Setup Gradle + uses: gradle/gradle-build-action@v2 + with: + cache-read-only: false + - name: Authenticate on GCP + uses: google-github-actions/setup-gcloud@v0 + with: + service_account_email: ${{ secrets.GCP_SA_EMAIL }} + service_account_key: ${{ secrets.GCP_SA_KEY }} + project_id: ${{ secrets.GCP_PROJECT_ID }} + export_default_credentials: true + - name: run javaExamplesDataflowPrecommit script + run: ./gradlew :javaExamplesDataflowPrecommit + -PdisableSpotlessCheck=true + -PdisableCheckStyle=true + - name: Upload test report + uses: actions/upload-artifact@v3 + with: + name: java-code-coverage-report + path: "**/build/test-results/**/*.xml" \ No newline at end of file diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py index 8b81c9f17ac61..2efc60c54c9c6 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py @@ -18,6 +18,7 @@ """Unit tests for bundle processing.""" # pytype: skip-file +import time import unittest from typing import Dict from typing import List diff --git a/sdks/python/apache_beam/runners/worker/data_sampler_test.py b/sdks/python/apache_beam/runners/worker/data_sampler_test.py index b679361218306..655c0107504f0 100644 --- a/sdks/python/apache_beam/runners/worker/data_sampler_test.py +++ b/sdks/python/apache_beam/runners/worker/data_sampler_test.py @@ -343,6 +343,255 @@ def test_can_sample_exceptions(self): self.assertGreater(len(samples.element_samples), 0) +class DataSamplerTest(unittest.TestCase): + def make_test_descriptor( + self, + outputs: Optional[List[str]] = None, + transforms: Optional[List[str]] = None + ) -> beam_fn_api_pb2.ProcessBundleDescriptor: + outputs = outputs or [MAIN_PCOLLECTION_ID] + transforms = transforms or [MAIN_TRANSFORM_ID] + + descriptor = beam_fn_api_pb2.ProcessBundleDescriptor() + for transform_id in transforms: + transform = descriptor.transforms[transform_id] + for output in outputs: + transform.outputs[output] = output + + return descriptor + + def setUp(self): + self.data_sampler = DataSampler(sample_every_sec=0.1) + + def tearDown(self): + self.data_sampler.stop() + + def wait_for_samples( + self, data_sampler: DataSampler, + pcollection_ids: List[str]) -> Dict[str, List[bytes]]: + """Waits for samples to exist for the given PCollections.""" + now = time.time() + end = now + 30 + + samples = {} + while now < end: + time.sleep(0.1) + now = time.time() + samples.update(data_sampler.samples(pcollection_ids)) + + if not samples: + continue + + has_all = all(pcoll_id in samples for pcoll_id in pcollection_ids) + if has_all: + return samples + + self.assertLess( + now, + end, + 'Timed out waiting for samples for {}'.format(pcollection_ids)) + return {} + + def primitives_coder_factory(self, _): + return PRIMITIVES_CODER + + def gen_sample( + self, + data_sampler: DataSampler, + element: Any, + output_index: int, + transform_id: str = MAIN_TRANSFORM_ID): + """Generates a sample for the given transform's output.""" + element_sampler = self.data_sampler.sampler_for_output( + transform_id, output_index) + element_sampler.el = element + element_sampler.has_element = True + + def test_single_output(self): + """Simple test for a single sample.""" + descriptor = self.make_test_descriptor() + self.data_sampler.initialize_samplers( + MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) + + self.gen_sample(self.data_sampler, 'a', output_index=0) + + expected_sample = { + MAIN_PCOLLECTION_ID: [PRIMITIVES_CODER.encode_nested('a')] + } + samples = self.wait_for_samples(self.data_sampler, [MAIN_PCOLLECTION_ID]) + self.assertEqual(samples, expected_sample) + + def test_not_initialized(self): + """Tests that transforms fail gracefully if not properly initialized.""" + with self.assertLogs() as cm: + self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) + self.assertRegex(cm.output[0], 'Out-of-bounds access.*') + + def map_outputs_to_indices( + self, outputs, descriptor, transform_id=MAIN_TRANSFORM_ID): + tag_list = list(descriptor.transforms[transform_id].outputs) + return {output: tag_list.index(output) for output in outputs} + + def test_sampler_mapping(self): + """Tests that the ElementSamplers are created for the correct output.""" + # Initialize the DataSampler with the following outputs. The order here may + # get shuffled when inserting into the descriptor. + pcollection_ids = ['o0', 'o1', 'o2'] + descriptor = self.make_test_descriptor(outputs=pcollection_ids) + samplers = self.data_sampler.initialize_samplers( + MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) + + # Create a map from the PCollection id to the index into the transform + # output. This mirrors what happens when operators are created. The index of + # an output is where in the PTransform.outputs it is located (when the map + # is converted to a list). + outputs = self.map_outputs_to_indices(pcollection_ids, descriptor) + + # Assert that the mapping is correct, i.e. that we can go from the + # PCollection id -> output index and that this is the same as the created + # samplers. + index = outputs['o0'] + self.assertEqual( + self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, index), + samplers[index]) + + index = outputs['o1'] + self.assertEqual( + self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, index), + samplers[index]) + + index = outputs['o2'] + self.assertEqual( + self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, index), + samplers[index]) + + def test_multiple_outputs(self): + """Tests that multiple PCollections have their own sampler.""" + pcollection_ids = ['o0', 'o1', 'o2'] + descriptor = self.make_test_descriptor(outputs=pcollection_ids) + outputs = self.map_outputs_to_indices(pcollection_ids, descriptor) + + self.data_sampler.initialize_samplers( + MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) + + self.gen_sample(self.data_sampler, 'a', output_index=outputs['o0']) + self.gen_sample(self.data_sampler, 'b', output_index=outputs['o1']) + self.gen_sample(self.data_sampler, 'c', output_index=outputs['o2']) + + samples = self.wait_for_samples(self.data_sampler, ['o0', 'o1', 'o2']) + expected_samples = { + 'o0': [PRIMITIVES_CODER.encode_nested('a')], + 'o1': [PRIMITIVES_CODER.encode_nested('b')], + 'o2': [PRIMITIVES_CODER.encode_nested('c')], + } + self.assertEqual(samples, expected_samples) + + def test_multiple_transforms(self): + """Test that multiple transforms with the same PCollections can be sampled. + """ + # Initialize two transform both with the same two outputs. + pcollection_ids = ['o0', 'o1'] + descriptor = self.make_test_descriptor( + outputs=pcollection_ids, transforms=['t0', 't1']) + t0_outputs = self.map_outputs_to_indices( + pcollection_ids, descriptor, transform_id='t0') + t1_outputs = self.map_outputs_to_indices( + pcollection_ids, descriptor, transform_id='t1') + + self.data_sampler.initialize_samplers( + 't0', descriptor, self.primitives_coder_factory) + + self.data_sampler.initialize_samplers( + 't1', descriptor, self.primitives_coder_factory) + + # The OutputSampler is on a different thread so we don't test the same + # PCollections to ensure that no data race occurs. + self.gen_sample( + self.data_sampler, + 'a', + output_index=t0_outputs['o0'], + transform_id='t0') + self.gen_sample( + self.data_sampler, + 'd', + output_index=t1_outputs['o1'], + transform_id='t1') + expected_samples = { + 'o0': [PRIMITIVES_CODER.encode_nested('a')], + 'o1': [PRIMITIVES_CODER.encode_nested('d')], + } + samples = self.wait_for_samples(self.data_sampler, ['o0', 'o1']) + self.assertEqual(samples, expected_samples) + + self.gen_sample( + self.data_sampler, + 'b', + output_index=t0_outputs['o1'], + transform_id='t0') + self.gen_sample( + self.data_sampler, + 'c', + output_index=t1_outputs['o0'], + transform_id='t1') + expected_samples = { + 'o0': [PRIMITIVES_CODER.encode_nested('c')], + 'o1': [PRIMITIVES_CODER.encode_nested('b')], + } + samples = self.wait_for_samples(self.data_sampler, ['o0', 'o1']) + self.assertEqual(samples, expected_samples) + + def test_sample_filters_single_pcollection_ids(self): + """Tests the samples can be filtered based on a single pcollection id.""" + pcollection_ids = ['o0', 'o1', 'o2'] + descriptor = self.make_test_descriptor(outputs=pcollection_ids) + outputs = self.map_outputs_to_indices(pcollection_ids, descriptor) + + self.data_sampler.initialize_samplers( + MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) + + self.gen_sample(self.data_sampler, 'a', output_index=outputs['o0']) + self.gen_sample(self.data_sampler, 'b', output_index=outputs['o1']) + self.gen_sample(self.data_sampler, 'c', output_index=outputs['o2']) + + samples = self.wait_for_samples(self.data_sampler, ['o0']) + expected_samples = { + 'o0': [PRIMITIVES_CODER.encode_nested('a')], + } + self.assertEqual(samples, expected_samples) + + samples = self.wait_for_samples(self.data_sampler, ['o1']) + expected_samples = { + 'o1': [PRIMITIVES_CODER.encode_nested('b')], + } + self.assertEqual(samples, expected_samples) + + samples = self.wait_for_samples(self.data_sampler, ['o2']) + expected_samples = { + 'o2': [PRIMITIVES_CODER.encode_nested('c')], + } + self.assertEqual(samples, expected_samples) + + def test_sample_filters_multiple_pcollection_ids(self): + """Tests the samples can be filtered based on a multiple pcollection ids.""" + pcollection_ids = ['o0', 'o1', 'o2'] + descriptor = self.make_test_descriptor(outputs=pcollection_ids) + outputs = self.map_outputs_to_indices(pcollection_ids, descriptor) + + self.data_sampler.initialize_samplers( + MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) + + self.gen_sample(self.data_sampler, 'a', output_index=outputs['o0']) + self.gen_sample(self.data_sampler, 'b', output_index=outputs['o1']) + self.gen_sample(self.data_sampler, 'c', output_index=outputs['o2']) + + samples = self.wait_for_samples(self.data_sampler, ['o0', 'o2']) + expected_samples = { + 'o0': [PRIMITIVES_CODER.encode_nested('a')], + 'o2': [PRIMITIVES_CODER.encode_nested('c')], + } + self.assertEqual(samples, expected_samples) + + class OutputSamplerTest(unittest.TestCase): def tearDown(self): self.sampler.stop()