diff --git a/.github/workflows/playground_examples_cd.yml b/.github/workflows/playground_examples_cd.yml new file mode 100644 index 0000000000000..c028451a44bf3 --- /dev/null +++ b/.github/workflows/playground_examples_cd.yml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Playground Examples CD + +on: + workflow_dispatch: +# Concurrency group for all deployment ops +concurrency: playground_production +jobs: + deploy_examples: + strategy: + matrix: + sdk: ["python", "go", "java"] + # run sequentially + max-parallel: 1 + uses: ./.github/workflows/playground_examples_cd_reusable.yml + with: + sdk: ${{ matrix.sdk }} + origin: PG_EXAMPLES + subdirs: "./learning/katas ./examples ./sdks" + secrets: + project_id: ${{ secrets.GCP_PLAYGROUND_PROJECT_ID }} + sa_key_content: ${{ secrets.GCP_PLAYGROUND_SA_KEY }} diff --git a/.github/workflows/playground_examples_cd_reusable.yml b/.github/workflows/playground_examples_cd_reusable.yml new file mode 100644 index 0000000000000..250143540be42 --- /dev/null +++ b/.github/workflows/playground_examples_cd_reusable.yml @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Playground Examples CD for a given SDK and origin + +on: + workflow_call: + inputs: + sdk: + type: string + required: true + subdirs: + type: string + required: true + origin: + type: string + required: true + secrets: + project_id: + required: true + sa_key_content: + required: true + +jobs: + cd: + name: CD ${{ inputs.sdk }} ${{ inputs.origin }} + runs-on: ubuntu-latest + env: + ORIGIN: ${{ inputs.origin }} + SDK: ${{ inputs.sdk }} + STEP: CD + SUBDIRS: ${{ inputs.subdirs }} + + GOOGLE_APPLICATION_CREDENTIALS: /tmp/gcp_access.json + GOOGLE_CLOUD_PROJECT: ${{ secrets.project_id }} + SA_KEY_CONTENT: ${{ secrets.sa_key_content }} + steps: + - name: Check out the repo + uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: '3.8' + - name: install deps + run: pip install -r requirements.txt + working-directory: playground/infrastructure + - name: Decode GCP credentials + run: | + echo "$SA_KEY_CONTENT" | base64 -d > $GOOGLE_APPLICATION_CREDENTIALS + + - name: Run ci_cd.py + run: | + python3 ci_cd.py \ + --step $STEP \ + --sdk SDK_${SDK^^} \ + --origin $ORIGIN \ + --subdirs $SUBDIRS + working-directory: playground/infrastructure + env: + BEAM_ROOT_DIR: "../../" + SDK_CONFIG: "../../playground/sdks.yaml" + BEAM_EXAMPLE_CATEGORIES: "../categories.yaml" + SERVER_ADDRESS: https://backend-${{ env.SDK }}-beta-dot-apache-beam-testing.appspot.com + BEAM_USE_WEBGRPC: yes + BEAM_CONCURRENCY: 4 diff --git a/.github/workflows/playground_examples_ci.yml b/.github/workflows/playground_examples_ci.yml index e75e4ff206472..b6bd5d7b86386 100644 --- a/.github/workflows/playground_examples_ci.yml +++ b/.github/workflows/playground_examples_ci.yml @@ -18,7 +18,8 @@ name: Playground Examples CI on: push: paths: - - .github/workflows/playground* + - .github/workflows/playground_examples_ci_reusable.yml + - .github/workflows/playground_examples_ci.yml - playground/backend/** - playground/infrastructure/** - learning/katas/** @@ -35,7 +36,7 @@ jobs: sdk: ["python", "go", "java"] # run sequentially max-parallel: 1 - uses: ./.github/workflows/playground_examples_reusable.yml + uses: ./.github/workflows/playground_examples_ci_reusable.yml with: sdk: ${{ matrix.sdk }} step: CI @@ -43,7 +44,7 @@ jobs: subdirs: ./learning/katas ./examples ./sdks # unfortunately, there's no input type for list allowlist: | - .github/workflows/playground_examples_reusable.yml \ + .github/workflows/playground_examples_ci_reusable.yml \ .github/workflows/playground_examples_ci.yml \ playground/backend \ - playground/infrastructure \ No newline at end of file + playground/infrastructure \ diff --git a/.github/workflows/playground_examples_reusable.yml b/.github/workflows/playground_examples_ci_reusable.yml similarity index 100% rename from .github/workflows/playground_examples_reusable.yml rename to .github/workflows/playground_examples_ci_reusable.yml diff --git a/.github/workflows/tour_of_beam_examples_ci.yml b/.github/workflows/tour_of_beam_examples_ci.yml index cbfe86a3adad5..43d2e5781eb8a 100644 --- a/.github/workflows/tour_of_beam_examples_ci.yml +++ b/.github/workflows/tour_of_beam_examples_ci.yml @@ -18,7 +18,7 @@ name: Tour Of Beam Examples CI on: push: paths: - - ./.github/workflows/playground_examples_reusable.yml + - ./.github/workflows/playground_examples_ci_reusable.yml - ./.github/workflows/tour_of_beam_examples_ci.yml - playground/backend/** - playground/infrastructure/** @@ -34,14 +34,14 @@ jobs: sdk: ["python", "go", "java"] # run sequentially max-parallel: 1 - uses: ./.github/workflows/playground_examples_reusable.yml + uses: ./.github/workflows/playground_examples_ci_reusable.yml with: sdk: ${{ matrix.sdk }} step: CI origin: TB_EXAMPLES subdirs: "./learning/tour-of-beam/learning-content" allowlist: | - .github/workflows/playground_examples_reusable.yml \ + .github/workflows/playground_examples_ci_reusable.yml \ .github/workflows/tour_of_beam_examples_ci.yml \ playground/backend \ - playground/infrastructure \ No newline at end of file + playground/infrastructure \ diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py index d057a7e554fb7..77ef0748366c5 100644 --- a/playground/infrastructure/cd_helper.py +++ b/playground/infrastructure/cd_helper.py @@ -78,27 +78,21 @@ async def _get_outputs(self, examples: List[Example]): Args: examples: beam examples that should be run """ - await get_statuses( - examples) # run examples code and wait until all are executed - client = GRPCClient() - tasks = [client.get_run_output(example.pipeline_id) for example in examples] - outputs = await asyncio.gather(*tasks) - - tasks = [client.get_log(example.pipeline_id) for example in examples] - logs = await asyncio.gather(*tasks) - - if len(examples) > 0 and examples[0].sdk in [SDK_PYTHON, SDK_JAVA]: - tasks = [ - client.get_graph(example.pipeline_id, example.filepath) - for example in examples - ] - graphs = await asyncio.gather(*tasks) - - for graph, example in zip(graphs, examples): - example.graph = graph - - for output, example in zip(outputs, examples): - example.output = output - - for log, example in zip(logs, examples): - example.logs = log + async def _populate_fields(example: Example): + try: + example.compile_output = await client.get_compile_output(example.pipeline_id) + example.output = await client.get_run_output(example.pipeline_id) + example.logs = await client.get_log(example.pipeline_id) + if example.sdk in [SDK_JAVA, SDK_PYTHON]: + example.graph = await client.get_graph(example.pipeline_id, example.filepath) + except Exception as e: + logging.error(example.link) + logging.error(example.compile_output) + raise RuntimeError(f"error in {example.name}") from e + + async with GRPCClient() as client: + await get_statuses(client, + examples) # run examples code and wait until all are executed + tasks = [_populate_fields(example) for example in examples] + await asyncio.gather(*tasks) + diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/ci_helper.py index 63863ffb0ab1e..a290e2e9a72d2 100644 --- a/playground/infrastructure/ci_helper.py +++ b/playground/infrastructure/ci_helper.py @@ -55,10 +55,11 @@ async def verify_examples(self, examples: List[Example], origin: Origin): """ single_file_examples = list(filter( lambda example: example.tag.multifile is False, examples)) - await get_statuses(single_file_examples) - await self._verify_examples(single_file_examples, origin) + async with GRPCClient() as client: + await get_statuses(client, single_file_examples) + await self._verify_examples(client, single_file_examples, origin) - async def _verify_examples(self, examples: List[Example], origin: Origin): + async def _verify_examples(self, client: any, examples: List[Example], origin: Origin): """ Verify statuses of beam examples and the number of found default examples. @@ -74,7 +75,6 @@ async def _verify_examples(self, examples: List[Example], origin: Origin): examples: beam examples that should be verified """ count_of_verified = 0 - client = GRPCClient() verify_status_failed = False default_examples = [] diff --git a/playground/infrastructure/grpc_client.py b/playground/infrastructure/grpc_client.py index fb1316f40218b..c3ae7c8ca3e6e 100644 --- a/playground/infrastructure/grpc_client.py +++ b/playground/infrastructure/grpc_client.py @@ -17,9 +17,11 @@ Module contains the client to communicate with GRPC test Playground server """ import logging +import os import uuid import grpc +import sonora.aio from api.v1 import api_pb2_grpc, api_pb2 from config import Config @@ -29,11 +31,25 @@ class GRPCClient: """GRPCClient is gRPC client for sending a request to the backend.""" def __init__(self, timeout=10, wait_for_ready=True): - self._channel = grpc.aio.insecure_channel(Config.SERVER_ADDRESS) + use_webgrpc = os.getenv("BEAM_USE_WEBGRPC", False) + if use_webgrpc: + self._channel = sonora.aio.insecure_web_channel(Config.SERVER_ADDRESS) + else: + self._channel = grpc.aio.insecure_channel(Config.SERVER_ADDRESS) + self._stub = api_pb2_grpc.PlaygroundServiceStub(self._channel) - self._timeout = timeout - self._wait_for_ready = wait_for_ready + self._kwargs = dict(timeout=timeout) + if wait_for_ready and not use_webgrpc: + self._kwargs["wait_for_ready"] = True + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self._channel.__aexit__(exc_type, exc_val, exc_tb) + + async def run_code( self, code: str, sdk: api_pb2.Sdk, pipeline_options: str) -> str: """ @@ -54,7 +70,7 @@ async def run_code( f'Incorrect sdk: must be from this pool: {", ".join(sdks)}') request = api_pb2.RunCodeRequest( code=code, sdk=sdk, pipeline_options=pipeline_options) - response = await self._stub.RunCode(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready) + response = await self._stub.RunCode(request, **self._kwargs) return response.pipeline_uuid async def check_status(self, pipeline_uuid: str) -> api_pb2.Status: @@ -69,7 +85,7 @@ async def check_status(self, pipeline_uuid: str) -> api_pb2.Status: """ self._verify_pipeline_uuid(pipeline_uuid) request = api_pb2.CheckStatusRequest(pipeline_uuid=pipeline_uuid) - response = await self._stub.CheckStatus(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready) + response = await self._stub.CheckStatus(request, **self._kwargs) return response.status async def get_run_error(self, pipeline_uuid: str) -> str: @@ -84,7 +100,7 @@ async def get_run_error(self, pipeline_uuid: str) -> str: """ self._verify_pipeline_uuid(pipeline_uuid) request = api_pb2.GetRunErrorRequest(pipeline_uuid=pipeline_uuid) - response = await self._stub.GetRunError(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready) + response = await self._stub.GetRunError(request, **self._kwargs) return response.output async def get_run_output(self, pipeline_uuid: str) -> str: @@ -99,7 +115,7 @@ async def get_run_output(self, pipeline_uuid: str) -> str: """ self._verify_pipeline_uuid(pipeline_uuid) request = api_pb2.GetRunOutputRequest(pipeline_uuid=pipeline_uuid) - response = await self._stub.GetRunOutput(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready) + response = await self._stub.GetRunOutput(request, **self._kwargs) return response.output async def get_log(self, pipeline_uuid: str) -> str: @@ -114,7 +130,8 @@ async def get_log(self, pipeline_uuid: str) -> str: """ self._verify_pipeline_uuid(pipeline_uuid) request = api_pb2.GetLogsRequest(pipeline_uuid=pipeline_uuid) - response = await self._stub.GetLogs(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready) + response = await self._stub.GetLogs(request, **self._kwargs) + return response.output async def get_compile_output(self, pipeline_uuid: str) -> str: @@ -129,7 +146,8 @@ async def get_compile_output(self, pipeline_uuid: str) -> str: """ self._verify_pipeline_uuid(pipeline_uuid) request = api_pb2.GetCompileOutputRequest(pipeline_uuid=pipeline_uuid) - response = await self._stub.GetCompileOutput(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready) + response = await self._stub.GetCompileOutput(request, **self._kwargs) + return response.output async def get_graph(self, pipeline_uuid: str, example_filepath: str) -> str: @@ -146,7 +164,7 @@ async def get_graph(self, pipeline_uuid: str, example_filepath: str) -> str: self._verify_pipeline_uuid(pipeline_uuid) request = api_pb2.GetGraphRequest(pipeline_uuid=pipeline_uuid) try: - response = await self._stub.GetGraph(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready) + response = await self._stub.GetGraph(request, **self._kwargs) if response.graph == "": logging.warning("Graph for %s wasn't generated", example_filepath) return response.graph diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py index c493eb51baf8b..53c9a98072e90 100644 --- a/playground/infrastructure/helper.py +++ b/playground/infrastructure/helper.py @@ -70,6 +70,7 @@ class Example: type: PrecompiledObjectType = PRECOMPILED_OBJECT_TYPE_UNSPECIFIED pipeline_id: str = "" output: str = "" + compile_output: str = "" graph: str = "" @@ -146,7 +147,7 @@ def find_examples(root_dir: str, subdirs: List[str], supported_categories: List[ return examples -async def get_statuses(examples: List[Example], concurrency: int = 10): +async def get_statuses(client: GRPCClient, examples: List[Example], concurrency: int = 10): """ Receive status and update example.status and example.pipeline_id for each example @@ -156,8 +157,6 @@ async def get_statuses(examples: List[Example], concurrency: int = 10): pipeline_id values. """ tasks = [] - client = GRPCClient() - try: concurrency = int(os.environ["BEAM_CONCURRENCY"]) logging.info("override default concurrency: %d", concurrency) diff --git a/playground/infrastructure/requirements.txt b/playground/infrastructure/requirements.txt index fef04766a89f3..e57056d1163c9 100644 --- a/playground/infrastructure/requirements.txt +++ b/playground/infrastructure/requirements.txt @@ -24,4 +24,5 @@ pytest-asyncio==0.18.2 pytest-mock==3.6.1 PyYAML==6.0 tqdm~=4.62.3 -google-cloud-datastore==2.7.1 \ No newline at end of file +google-cloud-datastore==2.7.1 +sonora==0.2.2 diff --git a/playground/infrastructure/test_ci_helper.py b/playground/infrastructure/test_ci_helper.py index 192b39ffaa06a..2c1103137d75a 100644 --- a/playground/infrastructure/test_ci_helper.py +++ b/playground/infrastructure/test_ci_helper.py @@ -34,14 +34,12 @@ async def test_verify_examples(mock_get_statuses, mock_verify_examples): helper = CIHelper() await helper.verify_examples([], Origin.PG_EXAMPLES) - mock_get_statuses.assert_called_once_with([]) - mock_verify_examples.assert_called_once_with([], Origin.PG_EXAMPLES) + mock_get_statuses.assert_called_once_with(mock.ANY, []) + mock_verify_examples.assert_called_once_with(mock.ANY, [], Origin.PG_EXAMPLES) @pytest.mark.asyncio -@mock.patch("grpc_client.GRPCClient.get_run_error") -@mock.patch("grpc_client.GRPCClient.get_compile_output") -async def test__verify_examples(mock_get_compile_output, mock_get_run_output): +async def test__verify_examples(): helper = CIHelper() object_meta = { "name": "name", @@ -167,11 +165,11 @@ async def test__verify_examples(mock_get_compile_output, mock_get_run_output): tag=Tag(**object_meta), link="link"), ] - + client = mock.AsyncMock() with pytest.raises(VerifyException): - await helper._verify_examples(examples_with_errors, Origin.PG_EXAMPLES) + await helper._verify_examples(client, examples_with_errors, Origin.PG_EXAMPLES) with pytest.raises(VerifyException): - await helper._verify_examples(examples_without_def_ex, Origin.PG_EXAMPLES) + await helper._verify_examples(client, examples_without_def_ex, Origin.PG_EXAMPLES) with pytest.raises(VerifyException): - await helper._verify_examples(examples_with_several_def_ex, Origin.PG_EXAMPLES) - await helper._verify_examples(examples_without_errors, Origin.PG_EXAMPLES) + await helper._verify_examples(client, examples_with_several_def_ex, Origin.PG_EXAMPLES) + await helper._verify_examples(client, examples_without_errors, Origin.PG_EXAMPLES) diff --git a/playground/infrastructure/test_helper.py b/playground/infrastructure/test_helper.py index 804fd6681d120..08b8016275562 100644 --- a/playground/infrastructure/test_helper.py +++ b/playground/infrastructure/test_helper.py @@ -93,9 +93,8 @@ def test_find_examples(mock_os_walk, mock_check_file, mock_check_no_nested, is_v @pytest.mark.asyncio -@mock.patch("helper.GRPCClient") @mock.patch("helper._update_example_status") -async def test_get_statuses(mock_update_example_status, mock_grpc_client): +async def test_get_statuses(mock_update_example_status): example = Example( name="file", complexity="MEDIUM", @@ -107,11 +106,8 @@ async def test_get_statuses(mock_update_example_status, mock_grpc_client): status=STATUS_UNSPECIFIED, tag={"name": "Name"}, link="link") - client = None - - mock_grpc_client.return_value = client - - await get_statuses([example]) + client = mock.sentinel + await get_statuses(client, [example]) mock_update_example_status.assert_called_once_with(example, client)