Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
eantyshev committed Oct 17, 2022
1 parent c320407 commit f33bcfa
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 20 deletions.
34 changes: 14 additions & 20 deletions playground/infrastructure/cd_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,21 @@ async def _get_outputs(self, examples: List[Example]):
Args:
examples: beam examples that should be run
"""
async def _populate_fields(example: Example):
try:
example.compile_output = await client.get_compile_output(example.pipeline_id)
example.output = await client.get_run_output(example.pipeline_id)
example.logs = await client.get_log(example.pipeline_id)
if example.sdk in [SDK_JAVA, SDK_PYTHON]:
example.graph = await client.get_graph(example.pipeline_id, example.filepath)
except Exception as e:
logging.error(example.link)
logging.error(example.compile_output)
raise RuntimeError(f"error in {example.name}") from e

async with GRPCClient() as client:
await get_statuses(client,
examples) # run examples code and wait until all are executed
tasks = [client.get_run_output(example.pipeline_id) for example in examples]
outputs = await asyncio.gather(*tasks)

tasks = [client.get_log(example.pipeline_id) for example in examples]
logs = await asyncio.gather(*tasks)

if len(examples) > 0 and examples[0].sdk in [SDK_PYTHON, SDK_JAVA]:
tasks = [
client.get_graph(example.pipeline_id, example.filepath)
for example in examples
]
graphs = await asyncio.gather(*tasks)

for graph, example in zip(graphs, examples):
example.graph = graph

for output, example in zip(outputs, examples):
example.output = output
tasks = [_populate_fields(example) for example in examples]
await asyncio.gather(*tasks)

for log, example in zip(logs, examples):
example.logs = log
1 change: 1 addition & 0 deletions playground/infrastructure/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Example:
type: PrecompiledObjectType = PRECOMPILED_OBJECT_TYPE_UNSPECIFIED
pipeline_id: str = ""
output: str = ""
compile_output: str = ""
graph: str = ""


Expand Down

0 comments on commit f33bcfa

Please sign in to comment.