Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Playground] Examples CD #23664

Merged
merged 44 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
ee9d539
WIP workflow
eantyshev Sep 7, 2022
8e26e28
WIP
eantyshev Sep 7, 2022
e61f024
aio
eantyshev Sep 8, 2022
9ebbdf5
subdirs
eantyshev Sep 9, 2022
a13880e
fix
eantyshev Sep 9, 2022
7f36b45
WF fix
eantyshev Sep 12, 2022
97df1a2
WF tmp
eantyshev Sep 12, 2022
2d12789
TO REMOVE
eantyshev Sep 12, 2022
04bb36f
Update playground_deploy_examples.yml
MakarkinSAkvelon Sep 12, 2022
d892a0f
Update playground_deploy_examples.yml
MakarkinSAkvelon Sep 12, 2022
bbb9361
GA workflow
eantyshev Sep 13, 2022
70550d1
TO REMOVE
eantyshev Sep 13, 2022
36c4d7c
WIP
eantyshev Sep 20, 2022
9dfb869
wf
eantyshev Sep 21, 2022
7fe6013
to remove
eantyshev Sep 21, 2022
efd4a95
Revert "to remove"
eantyshev Sep 21, 2022
f699df1
revert
eantyshev Oct 4, 2022
251f3b9
concurrency2
eantyshev Oct 4, 2022
873b746
no concurrency
eantyshev Oct 4, 2022
dbea67f
lock
eantyshev Oct 4, 2022
b28486c
seq
eantyshev Oct 4, 2022
1963f4c
beam_concurr
eantyshev Oct 4, 2022
6b43087
debug
eantyshev Oct 4, 2022
9acc1cb
debug
eantyshev Oct 4, 2022
0b60c30
dirt
eantyshev Oct 5, 2022
28eda8c
wait_for_ready
eantyshev Oct 5, 2022
5f19514
Revert "debug"
eantyshev Oct 5, 2022
c58e6f3
BEAM_CONC: 4
eantyshev Oct 5, 2022
6743927
origin required
eantyshev Oct 5, 2022
c8366ff
webgrpc
eantyshev Oct 5, 2022
71d23ca
examples_cd wf
eantyshev Oct 6, 2022
c1c9bec
add
eantyshev Oct 6, 2022
977e099
to remove
eantyshev Oct 6, 2022
378baf3
keep
eantyshev Oct 6, 2022
977b78b
fix
eantyshev Oct 6, 2022
c320407
secrets
eantyshev Oct 6, 2022
f33bcfa
refactor
eantyshev Oct 6, 2022
349d43c
rm
eantyshev Oct 17, 2022
065e696
-dump
eantyshev Oct 17, 2022
e362aeb
Revert "rm"
eantyshev Oct 17, 2022
fae2fc1
renamse
eantyshev Oct 17, 2022
6fcdc75
nit
eantyshev Oct 17, 2022
f4ae98c
fix
eantyshev Oct 17, 2022
237f26f
stable
eantyshev Oct 17, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/playground_examples_cd.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Playground Examples CD

on:
workflow_dispatch:
# Concurrency group for all deployment ops
concurrency: playground_production
jobs:
deploy_examples:
strategy:
matrix:
sdk: ["python", "go", "java"]
# run sequentially
max-parallel: 1
uses: ./.github/workflows/playground_examples_cd_reusable.yml
with:
sdk: ${{ matrix.sdk }}
origin: PG_EXAMPLES
subdirs: "./learning/katas ./examples ./sdks"
secrets:
project_id: ${{ secrets.GCP_PLAYGROUND_PROJECT_ID }}
sa_key_content: ${{ secrets.GCP_PLAYGROUND_SA_KEY }}
76 changes: 76 additions & 0 deletions .github/workflows/playground_examples_cd_reusable.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Playground Examples CD for a given SDK and origin

on:
workflow_call:
inputs:
sdk:
type: string
required: true
subdirs:
type: string
required: true
origin:
type: string
required: true
secrets:
project_id:
required: true
sa_key_content:
required: true

jobs:
cd:
name: CD ${{ inputs.sdk }} ${{ inputs.origin }}
runs-on: ubuntu-latest
env:
ORIGIN: ${{ inputs.origin }}
SDK: ${{ inputs.sdk }}
STEP: CD
SUBDIRS: ${{ inputs.subdirs }}

GOOGLE_APPLICATION_CREDENTIALS: /tmp/gcp_access.json
GOOGLE_CLOUD_PROJECT: ${{ secrets.project_id }}
SA_KEY_CONTENT: ${{ secrets.sa_key_content }}
steps:
- name: Check out the repo
uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.8'
- name: install deps
run: pip install -r requirements.txt
working-directory: playground/infrastructure
- name: Decode GCP credentials
run: |
echo "$SA_KEY_CONTENT" | base64 -d > $GOOGLE_APPLICATION_CREDENTIALS
- name: Run ci_cd.py
run: |
python3 ci_cd.py \
--step $STEP \
--sdk SDK_${SDK^^} \
--origin $ORIGIN \
--subdirs $SUBDIRS
working-directory: playground/infrastructure
env:
BEAM_ROOT_DIR: "../../"
SDK_CONFIG: "../../playground/sdks.yaml"
BEAM_EXAMPLE_CATEGORIES: "../categories.yaml"
SERVER_ADDRESS: https://backend-${{ env.SDK }}-beta-dot-apache-beam-testing.appspot.com
BEAM_USE_WEBGRPC: yes
BEAM_CONCURRENCY: 4
9 changes: 5 additions & 4 deletions .github/workflows/playground_examples_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ name: Playground Examples CI
on:
push:
paths:
- .github/workflows/playground*
- .github/workflows/playground_examples_ci_reusable.yml
- .github/workflows/playground_examples_ci.yml
- playground/backend/**
- playground/infrastructure/**
- learning/katas/**
Expand All @@ -35,15 +36,15 @@ jobs:
sdk: ["python", "go", "java"]
# run sequentially
max-parallel: 1
uses: ./.github/workflows/playground_examples_reusable.yml
uses: ./.github/workflows/playground_examples_ci_reusable.yml
with:
sdk: ${{ matrix.sdk }}
step: CI
origin: PG_EXAMPLES
subdirs: ./learning/katas ./examples ./sdks
# unfortunately, there's no input type for list
allowlist: |
.github/workflows/playground_examples_reusable.yml \
.github/workflows/playground_examples_ci_reusable.yml \
.github/workflows/playground_examples_ci.yml \
playground/backend \
playground/infrastructure
playground/infrastructure \
8 changes: 4 additions & 4 deletions .github/workflows/tour_of_beam_examples_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ name: Tour Of Beam Examples CI
on:
push:
paths:
- ./.github/workflows/playground_examples_reusable.yml
- ./.github/workflows/playground_examples_ci_reusable.yml
- ./.github/workflows/tour_of_beam_examples_ci.yml
- playground/backend/**
- playground/infrastructure/**
Expand All @@ -34,14 +34,14 @@ jobs:
sdk: ["python", "go", "java"]
# run sequentially
max-parallel: 1
uses: ./.github/workflows/playground_examples_reusable.yml
uses: ./.github/workflows/playground_examples_ci_reusable.yml
with:
sdk: ${{ matrix.sdk }}
step: CI
origin: TB_EXAMPLES
subdirs: "./learning/tour-of-beam/learning-content"
allowlist: |
.github/workflows/playground_examples_reusable.yml \
.github/workflows/playground_examples_ci_reusable.yml \
.github/workflows/tour_of_beam_examples_ci.yml \
playground/backend \
playground/infrastructure
playground/infrastructure \
42 changes: 18 additions & 24 deletions playground/infrastructure/cd_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,21 @@ async def _get_outputs(self, examples: List[Example]):
Args:
examples: beam examples that should be run
"""
await get_statuses(
examples) # run examples code and wait until all are executed
client = GRPCClient()
tasks = [client.get_run_output(example.pipeline_id) for example in examples]
outputs = await asyncio.gather(*tasks)

tasks = [client.get_log(example.pipeline_id) for example in examples]
logs = await asyncio.gather(*tasks)

if len(examples) > 0 and examples[0].sdk in [SDK_PYTHON, SDK_JAVA]:
tasks = [
client.get_graph(example.pipeline_id, example.filepath)
for example in examples
]
graphs = await asyncio.gather(*tasks)

for graph, example in zip(graphs, examples):
example.graph = graph

for output, example in zip(outputs, examples):
example.output = output

for log, example in zip(logs, examples):
example.logs = log
async def _populate_fields(example: Example):
try:
example.compile_output = await client.get_compile_output(example.pipeline_id)
example.output = await client.get_run_output(example.pipeline_id)
example.logs = await client.get_log(example.pipeline_id)
if example.sdk in [SDK_JAVA, SDK_PYTHON]:
example.graph = await client.get_graph(example.pipeline_id, example.filepath)
except Exception as e:
logging.error(example.link)
logging.error(example.compile_output)
raise RuntimeError(f"error in {example.name}") from e

async with GRPCClient() as client:
await get_statuses(client,
examples) # run examples code and wait until all are executed
tasks = [_populate_fields(example) for example in examples]
await asyncio.gather(*tasks)

8 changes: 4 additions & 4 deletions playground/infrastructure/ci_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ async def verify_examples(self, examples: List[Example], origin: Origin):
"""
single_file_examples = list(filter(
lambda example: example.tag.multifile is False, examples))
await get_statuses(single_file_examples)
await self._verify_examples(single_file_examples, origin)
async with GRPCClient() as client:
await get_statuses(client, single_file_examples)
await self._verify_examples(client, single_file_examples, origin)

async def _verify_examples(self, examples: List[Example], origin: Origin):
async def _verify_examples(self, client: any, examples: List[Example], origin: Origin):
"""
Verify statuses of beam examples and the number of found default examples.
Expand All @@ -74,7 +75,6 @@ async def _verify_examples(self, examples: List[Example], origin: Origin):
examples: beam examples that should be verified
"""
count_of_verified = 0
client = GRPCClient()
verify_status_failed = False
default_examples = []

Expand Down
38 changes: 28 additions & 10 deletions playground/infrastructure/grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
Module contains the client to communicate with GRPC test Playground server
"""
import logging
import os
import uuid

import grpc
import sonora.aio

from api.v1 import api_pb2_grpc, api_pb2
from config import Config
Expand All @@ -29,11 +31,25 @@ class GRPCClient:
"""GRPCClient is gRPC client for sending a request to the backend."""

def __init__(self, timeout=10, wait_for_ready=True):
self._channel = grpc.aio.insecure_channel(Config.SERVER_ADDRESS)
use_webgrpc = os.getenv("BEAM_USE_WEBGRPC", False)
if use_webgrpc:
self._channel = sonora.aio.insecure_web_channel(Config.SERVER_ADDRESS)
else:
self._channel = grpc.aio.insecure_channel(Config.SERVER_ADDRESS)

self._stub = api_pb2_grpc.PlaygroundServiceStub(self._channel)
self._timeout = timeout
self._wait_for_ready = wait_for_ready

self._kwargs = dict(timeout=timeout)
if wait_for_ready and not use_webgrpc:
self._kwargs["wait_for_ready"] = True

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._channel.__aexit__(exc_type, exc_val, exc_tb)


async def run_code(
self, code: str, sdk: api_pb2.Sdk, pipeline_options: str) -> str:
"""
Expand All @@ -54,7 +70,7 @@ async def run_code(
f'Incorrect sdk: must be from this pool: {", ".join(sdks)}')
request = api_pb2.RunCodeRequest(
code=code, sdk=sdk, pipeline_options=pipeline_options)
response = await self._stub.RunCode(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.RunCode(request, **self._kwargs)
return response.pipeline_uuid

async def check_status(self, pipeline_uuid: str) -> api_pb2.Status:
Expand All @@ -69,7 +85,7 @@ async def check_status(self, pipeline_uuid: str) -> api_pb2.Status:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.CheckStatusRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.CheckStatus(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.CheckStatus(request, **self._kwargs)
return response.status

async def get_run_error(self, pipeline_uuid: str) -> str:
Expand All @@ -84,7 +100,7 @@ async def get_run_error(self, pipeline_uuid: str) -> str:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetRunErrorRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.GetRunError(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetRunError(request, **self._kwargs)
return response.output

async def get_run_output(self, pipeline_uuid: str) -> str:
Expand All @@ -99,7 +115,7 @@ async def get_run_output(self, pipeline_uuid: str) -> str:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetRunOutputRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.GetRunOutput(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetRunOutput(request, **self._kwargs)
return response.output

async def get_log(self, pipeline_uuid: str) -> str:
Expand All @@ -114,7 +130,8 @@ async def get_log(self, pipeline_uuid: str) -> str:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetLogsRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.GetLogs(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetLogs(request, **self._kwargs)

return response.output

async def get_compile_output(self, pipeline_uuid: str) -> str:
Expand All @@ -129,7 +146,8 @@ async def get_compile_output(self, pipeline_uuid: str) -> str:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetCompileOutputRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.GetCompileOutput(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetCompileOutput(request, **self._kwargs)

return response.output

async def get_graph(self, pipeline_uuid: str, example_filepath: str) -> str:
Expand All @@ -146,7 +164,7 @@ async def get_graph(self, pipeline_uuid: str, example_filepath: str) -> str:
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetGraphRequest(pipeline_uuid=pipeline_uuid)
try:
response = await self._stub.GetGraph(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetGraph(request, **self._kwargs)
if response.graph == "":
logging.warning("Graph for %s wasn't generated", example_filepath)
return response.graph
Expand Down
5 changes: 2 additions & 3 deletions playground/infrastructure/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Example:
type: PrecompiledObjectType = PRECOMPILED_OBJECT_TYPE_UNSPECIFIED
pipeline_id: str = ""
output: str = ""
compile_output: str = ""
graph: str = ""


Expand Down Expand Up @@ -146,7 +147,7 @@ def find_examples(root_dir: str, subdirs: List[str], supported_categories: List[
return examples


async def get_statuses(examples: List[Example], concurrency: int = 10):
async def get_statuses(client: GRPCClient, examples: List[Example], concurrency: int = 10):
"""
Receive status and update example.status and example.pipeline_id for
each example
Expand All @@ -156,8 +157,6 @@ async def get_statuses(examples: List[Example], concurrency: int = 10):
pipeline_id values.
"""
tasks = []
client = GRPCClient()

try:
concurrency = int(os.environ["BEAM_CONCURRENCY"])
logging.info("override default concurrency: %d", concurrency)
Expand Down
3 changes: 2 additions & 1 deletion playground/infrastructure/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ pytest-asyncio==0.18.2
pytest-mock==3.6.1
PyYAML==6.0
tqdm~=4.62.3
google-cloud-datastore==2.7.1
google-cloud-datastore==2.7.1
sonora==0.2.2
Loading