Skip to content

Commit

Permalink
added beam_PreCommit_Java_Examples_Dataflow job
Browse files Browse the repository at this point in the history
added pull_request_target event

added required steps

improved if conditions

Bump github.com/aws/aws-sdk-go-v2/feature/s3/manager in /sdks (apache#27161)

Bumps [github.com/aws/aws-sdk-go-v2/feature/s3/manager](https://github.com/aws/aws-sdk-go-v2) from 1.11.67 to 1.11.70.
- [Release notes](https://github.com/aws/aws-sdk-go-v2/releases)
- [Changelog](https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md)
- [Commits](aws/aws-sdk-go-v2@feature/s3/manager/v1.11.67...feature/s3/manager/v1.11.70)

---
updated-dependencies:
- dependency-name: github.com/aws/aws-sdk-go-v2/feature/s3/manager
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

Using capabilities instead of the container name to set use_single_core_per_container

Add Exception metadata to the data sampling protocol

Add apache#27000 as a known issue released in Beam 2.48.0 (apache#27235)

Python data sampling optimization  (apache#27157)

* Python optimization work

* Exception Sampling perf tests

* add better element sampling microbenchmark

* slowly move towards plumbing the samplers to the bundle processors

* cleaned up

* starting clean up and more testing

* finish tests

* fix unused data_sampler args and comments

* yapf, comments, and simplifications

* linter

* lint and mypy

* linter

* run tests

* address review comments

* run tests

---------

Co-authored-by: Sam Rohde <srohde@google.com>

Improve autoscaler throughput estimates and account for heartbeats (apache#27056)

The throughput estimates are improved by storing the estimates per partition (in StreamProgress) rather than per dofn instance. This required some refactoring of ThroughputEstimator/SizeEstimator. It now estimates throughput (per second) of the last ReadChangeStream request.

fix(typescript): Publish types correctly, don't publish tests

fix(typescript): added more exports

Basic error handling for yaml. (apache#27145)

Updates Python ExternalTransform to use the transform service when needed (apache#27228)

* Updates Python ExternalTransform to use the transform service when needed

* Addressing reviewer comments

* Fix yapf

* Fix lint

* Fix yapf

Add blog post for Managing Beam dependencies in Java

Vertex AI Remote Model Handler (apache#27091)

* Move WIP code to new branch at HEAD

* Add client-side throttling

* Various linting issues

* More linting, add dependency to setup.py

* Fix Docstring

* new flags, types, framework for eventual file read

* Route to correct tutorial

* Better flag name

* Align type hints, pylint cleanup

* Move off of constant, preprocess image

* Change to keyed version for output

* Yapf

* Pylint

* Clean up ToDo

* Add TODOs for file globs

* Fix cast, remove unneccessary assert

* whitespace

* More whitespace

* Import order

* More import ordering

* Amend comment

[Playground] [Frontend] Brand colors in Playground flutter_code_editor (apache#27218)

* brand colors in PG FCE

* link to website

* restart ci

* restart ci

---------

Co-authored-by: darkhan.nausharipov <darkhan.nausharipov@kzn.akvelon.com>

Bump go.mongodb.org/mongo-driver from 1.11.7 to 1.12.0 in /sdks (apache#27215)

Bumps [go.mongodb.org/mongo-driver](https://github.com/mongodb/mongo-go-driver) from 1.11.7 to 1.12.0.
- [Release notes](https://github.com/mongodb/mongo-go-driver/releases)
- [Commits](mongodb/mongo-go-driver@v1.11.7...v1.12.0)

---
updated-dependencies:
- dependency-name: go.mongodb.org/mongo-driver
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

update readme with Beam Playground, Tour of Beam (apache#27243)

Co-authored-by: xqhu <xqhu@google.com>

runner image and arc changes (apache#27118)

* runner image and arc changes

* add state bucket comment in env file for reference

* add readme for manual image push and sync beam environment

---------

Co-authored-by: vdjerek <Vlado Djerek>

Fix whitespace (apache#27247)

[Tour of Beam] Learning content for "Final challenge" module (apache#26861)

* add final challenge

* correct examples

* changge

* correct golang

* correct example hint

* correct examples and add golang example

* delete whitespace

* delete whitespace

* add file tag

* correct final challenge

* fixing incorrect tags and names

* minor formatting

* fixing example formatting

* correct challenge

* correct whitespace

* change final-challenge-2

* correct imports

* fix splittable unit id

* formatting

* format change

* formatting

* fixing template

* temp fix of url

* backend urls fix

* restore backend urls

* change

* remove window

---------

Co-authored-by: mende1esmende1es <mende1esmende1es@gmail.cp>
Co-authored-by: Oleh Borysevych <oleg.borisevich@akvelon.com>

Bump transformers (apache#27108)

Bumps [transformers](https://github.com/huggingface/transformers) from 4.21.1 to 4.30.0.
- [Release notes](https://github.com/huggingface/transformers/releases)
- [Commits](huggingface/transformers@v4.21.1...v4.30.0)

---
updated-dependencies:
- dependency-name: transformers
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

Code completion plugin: add plugin tests workflow (apache#27212)

* Code completion plugin: add plugin tests workflow

* Testing changing directory in code_completion_plugin_tests.yml

* Making gradlew executable

* Fix gradlew intellij plugin import

* change gradlew premissions

* code-completion-plugin: fixing GA tests workflow

* Revert "change gradlew premissions"

This reverts commit ee396be.

* Revert "code-completion-plugin: fixing GA tests workflow"

This reverts commit 9baa351.

* fix code_completion_plugin_tests.yml

* Making gradlew executable

* code-completion-plugin-fix intellij plugin version in build.gradle.kts

* code-completion-plugin: change java version (17 -> 11) in code_completion_plugin_tests.yml

* test code completion plugin CI

* Add env vars to code_completion_plugin_test.yml

* code-completion-plugin: test ci

* code-completion-plugin: clone intellij-community to CI runner

---------

Co-authored-by: Pablo <pabloem@users.noreply.github.com>
Co-authored-by: Pablo E <pabloem@apache.org>

add job rerun action (apache#27210)

* add job rerun action

* Update .github/actions/rerun-job-action/action.yml

Typo

Co-authored-by: Danny McCormick <dannymccormick@google.com>

* header and a comment explaining the logic for going into re-run flow

* deduplicate API calls

* added some explanation within the action file

* Update .github/actions/rerun-job-action/action.yml

Typo

Co-authored-by: Danny McCormick <dannymccormick@google.com>

---------

Co-authored-by: vdjerek <Vlado Djerek>
Co-authored-by: Danny McCormick <dannymccormick@google.com>

Code completion plugin: Add Java SDK Transform Completions (Hard Coded) (apache#27168)

* [Code Completion Plugin] Define Element Pattern for java sdk

* Code Completion Plugin: Add java sdk transform completions

* Verifying that tests are running as expected :) Sorry!

* Update BeamCompletionContributorTestCase.java

---------

Co-authored-by: Pablo <pabloem@users.noreply.github.com>

[Website] add hsbc case study (apache#27200)

[Tour of Beam] add work example (apache#27080)

* add work example

* correct

* correct tags for bigquery examples

* correct read-query

* correct read-query tag

* correct imports

* remove package

* correct

* fixed example name

---------

Co-authored-by: mende1esmende1es <mende1esmende1es@gmail.cp>
Co-authored-by: Oleh Borysevych <oleg.borisevich@akvelon.com>

Remove py37 references (apache#27252)

Handling issue where keystore file is empty for MongoDB SSL (apache#27250)

Bump worker image dependency (apache#27253)

[apache#22737] Add line about timer support for Go (apache#27263)

Bump actions/checkout from 2 to 3 (apache#27259)

Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](actions/checkout@v2...v3)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

Add Tour of Beam Page (apache#27244)

* add tour of beam page to redirect.

* add tour of beam page to redirect.

* add tour of beam page to redirect.
  • Loading branch information
andreydevyatkin committed Jul 7, 2023
1 parent 71cb3ca commit 4167be2
Show file tree
Hide file tree
Showing 3 changed files with 347 additions and 0 deletions.
97 changes: 97 additions & 0 deletions .github/workflows/beam_PreCommit_Java_Examples_Dataflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: PreCommit Java Examples Dataflow

on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths: ['model/**', 'sdks/java/**', 'runners/google-cloud-dataflow-java/**', 'examples/java/**', 'examples/kotlin/**', 'release/**']
pull_request_target:
branches: ['master', 'release-*']
paths: ['model/**', 'sdks/java/**', 'runners/google-cloud-dataflow-java/**', 'examples/java/**', 'examples/kotlin/**', 'release/**']
issue_comment:
types: [created]
schedule:
- cron: '* */6 * * *'

permissions: read-all
jobs:
check_gcp_variables:
timeout-minutes: 5
name: check_gcp_variables
runs-on: [self-hosted, ubuntu-20.04]
outputs:
gcp-variables-set: ${{ steps.check_gcp_variables.outputs.gcp-variables-set }}
steps:
- name: Check out repository code
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Check are GCP variables set
run: "./scripts/ci/ci_check_are_gcp_variables_set.sh"
id: check_gcp_variables
env:
GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
GCP_SA_EMAIL: ${{ secrets.GCP_SA_EMAIL }}
GCP_SA_KEY: ${{ secrets.GCP_SA_KEY }}
GCP_TESTING_BUCKET: ${{ secrets.GCP_TESTING_BUCKET }}
GCP_REGION: "not-needed-here"
GCP_PYTHON_WHEELS_BUCKET: "not-needed-here"

beam_PreCommit_Java_Examples_Dataflow:
name: beam_PreCommit_Java_Examples_Dataflow
needs:
- check_gcp_variables
runs-on: [self-hosted, ubuntu-20.04]
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
github.event_name == 'schedule' ||
github.event.comment.body == 'Run Java_Examples_Dataflow PreCommit'
steps:
- name: Check out repository code
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Setup self-hosted
uses: ./.github/actions/setup-self-hosted-action
with:
requires-py-38: false
requires-py-39: false
requires-go: false
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
with:
cache-read-only: false
- name: Authenticate on GCP
uses: google-github-actions/setup-gcloud@v0
with:
service_account_email: ${{ secrets.GCP_SA_EMAIL }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
project_id: ${{ secrets.GCP_PROJECT_ID }}
export_default_credentials: true
- name: run javaExamplesDataflowPrecommit script
run: ./gradlew :javaExamplesDataflowPrecommit
-PdisableSpotlessCheck=true
-PdisableCheckStyle=true
- name: Upload test report
uses: actions/upload-artifact@v3
with:
name: java-code-coverage-report
path: "**/build/test-results/**/*.xml"
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Unit tests for bundle processing."""
# pytype: skip-file

import time
import unittest
from typing import Dict
from typing import List
Expand Down
249 changes: 249 additions & 0 deletions sdks/python/apache_beam/runners/worker/data_sampler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,255 @@ def test_can_sample_exceptions(self):
self.assertGreater(len(samples.element_samples), 0)


class DataSamplerTest(unittest.TestCase):
def make_test_descriptor(
self,
outputs: Optional[List[str]] = None,
transforms: Optional[List[str]] = None
) -> beam_fn_api_pb2.ProcessBundleDescriptor:
outputs = outputs or [MAIN_PCOLLECTION_ID]
transforms = transforms or [MAIN_TRANSFORM_ID]

descriptor = beam_fn_api_pb2.ProcessBundleDescriptor()
for transform_id in transforms:
transform = descriptor.transforms[transform_id]
for output in outputs:
transform.outputs[output] = output

return descriptor

def setUp(self):
self.data_sampler = DataSampler(sample_every_sec=0.1)

def tearDown(self):
self.data_sampler.stop()

def wait_for_samples(
self, data_sampler: DataSampler,
pcollection_ids: List[str]) -> Dict[str, List[bytes]]:
"""Waits for samples to exist for the given PCollections."""
now = time.time()
end = now + 30

samples = {}
while now < end:
time.sleep(0.1)
now = time.time()
samples.update(data_sampler.samples(pcollection_ids))

if not samples:
continue

has_all = all(pcoll_id in samples for pcoll_id in pcollection_ids)
if has_all:
return samples

self.assertLess(
now,
end,
'Timed out waiting for samples for {}'.format(pcollection_ids))
return {}

def primitives_coder_factory(self, _):
return PRIMITIVES_CODER

def gen_sample(
self,
data_sampler: DataSampler,
element: Any,
output_index: int,
transform_id: str = MAIN_TRANSFORM_ID):
"""Generates a sample for the given transform's output."""
element_sampler = self.data_sampler.sampler_for_output(
transform_id, output_index)
element_sampler.el = element
element_sampler.has_element = True

def test_single_output(self):
"""Simple test for a single sample."""
descriptor = self.make_test_descriptor()
self.data_sampler.initialize_samplers(
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

self.gen_sample(self.data_sampler, 'a', output_index=0)

expected_sample = {
MAIN_PCOLLECTION_ID: [PRIMITIVES_CODER.encode_nested('a')]
}
samples = self.wait_for_samples(self.data_sampler, [MAIN_PCOLLECTION_ID])
self.assertEqual(samples, expected_sample)

def test_not_initialized(self):
"""Tests that transforms fail gracefully if not properly initialized."""
with self.assertLogs() as cm:
self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0)
self.assertRegex(cm.output[0], 'Out-of-bounds access.*')

def map_outputs_to_indices(
self, outputs, descriptor, transform_id=MAIN_TRANSFORM_ID):
tag_list = list(descriptor.transforms[transform_id].outputs)
return {output: tag_list.index(output) for output in outputs}

def test_sampler_mapping(self):
"""Tests that the ElementSamplers are created for the correct output."""
# Initialize the DataSampler with the following outputs. The order here may
# get shuffled when inserting into the descriptor.
pcollection_ids = ['o0', 'o1', 'o2']
descriptor = self.make_test_descriptor(outputs=pcollection_ids)
samplers = self.data_sampler.initialize_samplers(
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

# Create a map from the PCollection id to the index into the transform
# output. This mirrors what happens when operators are created. The index of
# an output is where in the PTransform.outputs it is located (when the map
# is converted to a list).
outputs = self.map_outputs_to_indices(pcollection_ids, descriptor)

# Assert that the mapping is correct, i.e. that we can go from the
# PCollection id -> output index and that this is the same as the created
# samplers.
index = outputs['o0']
self.assertEqual(
self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, index),
samplers[index])

index = outputs['o1']
self.assertEqual(
self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, index),
samplers[index])

index = outputs['o2']
self.assertEqual(
self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, index),
samplers[index])

def test_multiple_outputs(self):
"""Tests that multiple PCollections have their own sampler."""
pcollection_ids = ['o0', 'o1', 'o2']
descriptor = self.make_test_descriptor(outputs=pcollection_ids)
outputs = self.map_outputs_to_indices(pcollection_ids, descriptor)

self.data_sampler.initialize_samplers(
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

self.gen_sample(self.data_sampler, 'a', output_index=outputs['o0'])
self.gen_sample(self.data_sampler, 'b', output_index=outputs['o1'])
self.gen_sample(self.data_sampler, 'c', output_index=outputs['o2'])

samples = self.wait_for_samples(self.data_sampler, ['o0', 'o1', 'o2'])
expected_samples = {
'o0': [PRIMITIVES_CODER.encode_nested('a')],
'o1': [PRIMITIVES_CODER.encode_nested('b')],
'o2': [PRIMITIVES_CODER.encode_nested('c')],
}
self.assertEqual(samples, expected_samples)

def test_multiple_transforms(self):
"""Test that multiple transforms with the same PCollections can be sampled.
"""
# Initialize two transform both with the same two outputs.
pcollection_ids = ['o0', 'o1']
descriptor = self.make_test_descriptor(
outputs=pcollection_ids, transforms=['t0', 't1'])
t0_outputs = self.map_outputs_to_indices(
pcollection_ids, descriptor, transform_id='t0')
t1_outputs = self.map_outputs_to_indices(
pcollection_ids, descriptor, transform_id='t1')

self.data_sampler.initialize_samplers(
't0', descriptor, self.primitives_coder_factory)

self.data_sampler.initialize_samplers(
't1', descriptor, self.primitives_coder_factory)

# The OutputSampler is on a different thread so we don't test the same
# PCollections to ensure that no data race occurs.
self.gen_sample(
self.data_sampler,
'a',
output_index=t0_outputs['o0'],
transform_id='t0')
self.gen_sample(
self.data_sampler,
'd',
output_index=t1_outputs['o1'],
transform_id='t1')
expected_samples = {
'o0': [PRIMITIVES_CODER.encode_nested('a')],
'o1': [PRIMITIVES_CODER.encode_nested('d')],
}
samples = self.wait_for_samples(self.data_sampler, ['o0', 'o1'])
self.assertEqual(samples, expected_samples)

self.gen_sample(
self.data_sampler,
'b',
output_index=t0_outputs['o1'],
transform_id='t0')
self.gen_sample(
self.data_sampler,
'c',
output_index=t1_outputs['o0'],
transform_id='t1')
expected_samples = {
'o0': [PRIMITIVES_CODER.encode_nested('c')],
'o1': [PRIMITIVES_CODER.encode_nested('b')],
}
samples = self.wait_for_samples(self.data_sampler, ['o0', 'o1'])
self.assertEqual(samples, expected_samples)

def test_sample_filters_single_pcollection_ids(self):
"""Tests the samples can be filtered based on a single pcollection id."""
pcollection_ids = ['o0', 'o1', 'o2']
descriptor = self.make_test_descriptor(outputs=pcollection_ids)
outputs = self.map_outputs_to_indices(pcollection_ids, descriptor)

self.data_sampler.initialize_samplers(
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

self.gen_sample(self.data_sampler, 'a', output_index=outputs['o0'])
self.gen_sample(self.data_sampler, 'b', output_index=outputs['o1'])
self.gen_sample(self.data_sampler, 'c', output_index=outputs['o2'])

samples = self.wait_for_samples(self.data_sampler, ['o0'])
expected_samples = {
'o0': [PRIMITIVES_CODER.encode_nested('a')],
}
self.assertEqual(samples, expected_samples)

samples = self.wait_for_samples(self.data_sampler, ['o1'])
expected_samples = {
'o1': [PRIMITIVES_CODER.encode_nested('b')],
}
self.assertEqual(samples, expected_samples)

samples = self.wait_for_samples(self.data_sampler, ['o2'])
expected_samples = {
'o2': [PRIMITIVES_CODER.encode_nested('c')],
}
self.assertEqual(samples, expected_samples)

def test_sample_filters_multiple_pcollection_ids(self):
"""Tests the samples can be filtered based on a multiple pcollection ids."""
pcollection_ids = ['o0', 'o1', 'o2']
descriptor = self.make_test_descriptor(outputs=pcollection_ids)
outputs = self.map_outputs_to_indices(pcollection_ids, descriptor)

self.data_sampler.initialize_samplers(
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

self.gen_sample(self.data_sampler, 'a', output_index=outputs['o0'])
self.gen_sample(self.data_sampler, 'b', output_index=outputs['o1'])
self.gen_sample(self.data_sampler, 'c', output_index=outputs['o2'])

samples = self.wait_for_samples(self.data_sampler, ['o0', 'o2'])
expected_samples = {
'o0': [PRIMITIVES_CODER.encode_nested('a')],
'o2': [PRIMITIVES_CODER.encode_nested('c')],
}
self.assertEqual(samples, expected_samples)


class OutputSamplerTest(unittest.TestCase):
def tearDown(self):
self.sampler.stop()
Expand Down

0 comments on commit 4167be2

Please sign in to comment.