Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor code to be more testable #22

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions binderbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions


class OperationError(Exception):
pass
252 changes: 19 additions & 233 deletions binderbot/binderbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,20 @@

from enum import Enum, auto
import aiohttp
import socket
import uuid
import random
from yarl import URL
import asyncio
import async_timeout
import structlog
import time
import json
import textwrap
import re

import nbformat
from nbconvert.preprocessors import ClearOutputPreprocessor

logger = structlog.get_logger()

# https://stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python
def _ansi_escape(text):
return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', text)

from binderbot import OperationError
from .kernel import Kernel
from .notebookclient import NotebookClient

class OperationError(Exception):
pass
logger = structlog.get_logger()


class BinderUser:
class States(Enum):
CLEAR = 1
# LOGGED_IN = 2
BINDER_STARTED = 3
KERNEL_STARTED = 4

async def __aenter__(self):
self.session = aiohttp.ClientSession(headers={'User-Agent': 'BinderBot-cli v0.1'})
return self
Expand All @@ -54,7 +35,6 @@ def __init__(self, binder_url, repo, ref):
self.binder_url = URL(binder_url)
self.repo = repo
self.ref = ref
self.state = BinderUser.States.CLEAR
self.log = logger.bind()

async def start_binder(self, timeout=3000, spawn_refresh_time=20):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At L45 below, need to change build/gh to v2/gh.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At L54 below, I'm not quite sure if the if line.startswith('data:'): statement works anymore. The HTML response I'm getting when testing with https://mybinder.org looks like:

<meta id="badge-base-url" data-url="https://mybinder.org/">
<meta id="build-token" data-token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpxVCJ9.eyJleHAiOjE2Njg4MzA3MTksImF1ZCI6ImdoL2dlby1zbWFydC9kZWVwaWNlZHJhaW5waXBlL3RlbXAtaXB5bmIiLCJvcmlnaW4iOiJteWJpbmRlci5vcmcifQ.pALJ9hVdHdq7-YbgIYkMU2lcdeu7MphArjnw38_DbKc">

so maybe the if-statement should be finding data-url and data-token instead?

Expand Down Expand Up @@ -90,215 +70,21 @@ async def start_binder(self, timeout=3000, spawn_refresh_time=20):
self.log.msg(f'Binder: Waiting on event stream (phase: {phase})', action='binder-start', phase='event-stream')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest checking that the notebook_url and token variables are set before proceeding to the next piece of logic. This will help a little bit with debugging #35.

Suggested change
self.log.msg(f'Binder: Waiting on event stream (phase: {phase})', action='binder-start', phase='event-stream')
self.log.msg(f'Binder: Waiting on event stream (phase: {phase})', action='binder-start', phase='event-stream')
try:
self.notebook_url
self.token
except AttributeError as e:
raise OperationError from e



# todo: double check phase is really always "ready" at this point
self.state = BinderUser.States.BINDER_STARTED

async def shutdown_binder(self):
# TODO: figure out how to shut down the binder using the API
# can we use the jupyterhub API:
# https://jupyterhub.readthedocs.io/en/stable/reference/rest.html#enabling-users-to-spawn-multiple-named-servers-via-the-api
pass

async def start_kernel(self):
assert self.state == BinderUser.States.BINDER_STARTED

self.log.msg('Kernel: Starting', action='kernel-start', phase='start')
start_time = time.monotonic()

try:
headers = {'Authorization': f'token {self.token}'}
resp = await self.session.post(self.notebook_url / 'api/kernels', headers=headers)
except Exception as e:
self.log.msg('Kernel: Start failed {}'.format(str(e)), action='kernel-start', phase='failed', duration=time.monotonic() - start_time)
raise OperationError()

if resp.status != 201:
self.log.msg('Kernel: Start failed', action='kernel-start', phase='failed')
raise OperationError()
self.kernel_id = (await resp.json())['id']
self.log.msg('Kernel: Started', action='kernel-start', phase='complete')
self.state = BinderUser.States.KERNEL_STARTED


async def stop_kernel(self):
assert self.state == BinderUser.States.KERNEL_STARTED

self.log.msg('Kernel: Stopping', action='kernel-stop', phase='start')
start_time = time.monotonic()
try:
headers = {'Authorization': f'token {self.token}'}
resp = await self.session.delete(self.notebook_url / 'api/kernels' / self.kernel_id, headers=headers)
except Exception as e:
self.log.msg('Kernel:Failed Stopped {}'.format(str(e)), action='kernel-stop', phase='failed')
raise OperationError()

if resp.status != 204:
self.log.msg('Kernel:Failed Stopped {}'.format(str(resp)), action='kernel-stop', phase='failed')
raise OperationError()

self.log.msg('Kernel: Stopped', action='kernel-stop', phase='complete')
self.state = BinderUser.States.BINDER_STARTED

# https://github.com/jupyter/jupyter/wiki/Jupyter-Notebook-Server-API#notebook-and-file-contents-api
async def get_contents(self, path):
headers = {'Authorization': f'token {self.token}'}
resp = await self.session.get(self.notebook_url / 'api/contents' / path, headers=headers)
resp_json = await resp.json()
return resp_json['content']


async def put_contents(self, path, nb_data):
headers = {'Authorization': f'token {self.token}'}
data = {'content': nb_data, "type": "notebook"}
resp = await self.session.put(self.notebook_url / 'api/contents' / path,
json=data, headers=headers)
resp.raise_for_status()

def request_execute_code(self, msg_id, code):
return {
"header": {
"msg_id": msg_id,
"username": "jovyan",
"msg_type": "execute_request",
"version": "5.2"
},
"metadata": {},
"content": {
"code": textwrap.dedent(code),
"silent": False,
"store_history": True,
"user_expressions": {},
"allow_stdin": True,
"stop_on_error": True
},
"buffers": [],
"parent_header": {},
"channel": "shell"
}


async def run_code(self, code):
"""Run code and return stdout, stderr."""
assert self.state == BinderUser.States.KERNEL_STARTED

channel_url = self.notebook_url / 'api/kernels' / self.kernel_id / 'channels'
self.log.msg('WS: Connecting', action='kernel-connect', phase='start')
is_connected = False
try:
async with self.session.ws_connect(channel_url) as ws:
is_connected = True
self.log.msg('WS: Connected', action='kernel-connect', phase='complete')
start_time = time.monotonic()
self.log.msg('Code Execute: Started', action='code-execute', phase='start')
exec_start_time = time.monotonic()
msg_id = str(uuid.uuid4())
await ws.send_json(self.request_execute_code(msg_id, code))

stdout = ''
stderr = ''

async for msg_text in ws:
if msg_text.type != aiohttp.WSMsgType.TEXT:
self.log.msg(
'WS: Unexpected message type',
action='code-execute', phase='failure',
message_type=msg_text.type, message=str(msg_text),
duration=time.monotonic() - exec_start_time
)
raise OperationError()

msg = msg_text.json()

if 'parent_header' in msg and msg['parent_header'].get('msg_id') == msg_id:
# These are responses to our request
self.log.msg(f'Code Execute: Receive response', action='code-execute', phase='receive-stream',
channel=msg['channel'], msg_type=msg['msg_type'])
if msg['channel'] == 'shell':
if msg['msg_type'] == 'execute_reply':
status = msg['content']['status']
if status == 'ok':
self.log.msg('Code Execute: Status OK', action='code-execute', phase='success')
break
else:
self.log.msg('Code Execute: Status {status}', action='code-execute', phase='error')
raise OperationError()
if msg['channel'] == 'iopub':
response = None
msg_type = msg.get('msg_type')
# don't really know what this is doing
#if msg_type == 'execute_result':
# response = msg['content']['data']['text/plain']
if msg_type == 'error':
traceback = _ansi_escape('\n'.join(msg['content']['traceback']))
self.log.msg('Code Execute: Error', action='code-execute',
phase='error',
traceback=traceback)
raise OperationError()
elif msg_type == 'stream':
response = msg['content']['text']
name = msg['content']['name']
if name == 'stdout':
stdout += response
elif name == 'stderr':
stderr += response
#print(response)
self.log.msg(
'Code Execute: complete',
action='code-execute', phase='complete',
duration=time.monotonic() - exec_start_time)

return stdout, stderr

except Exception as e:
if type(e) is OperationError:
raise
if is_connected:
self.log.msg('Code Execute: Failed {}'.format(str(e)), action='code-execute', phase='failure')
else:
self.log.msg('WS: Failed {}'.format(str(e)), action='kernel-connect', phase='failure')
raise OperationError()


async def list_notebooks(self):
code = """
import os, fnmatch, json
notebooks = [f for f in os.listdir() if fnmatch.fnmatch(f, '*.ipynb')]
print(json.dumps(notebooks))
"""
stdout, stderr = await self.run_code(code)
return json.loads(stdout)

async def execute_notebook(self, notebook_filename, timeout=600,
env_vars={}):
env_var_str = str(env_vars)
# https://nbconvert.readthedocs.io/en/latest/execute_api.html
code = f"""
import os
import nbformat
os.environ.update({env_var_str})
from nbconvert.preprocessors import ExecutePreprocessor
ep = ExecutePreprocessor(timeout={timeout})
print("Processing {notebook_filename}")
with open("{notebook_filename}") as f:
nb = nbformat.read(f, as_version=4)
ep.preprocess(nb, dict())
print("OK")
print("Saving {notebook_filename}")
with open("{notebook_filename}", 'w', encoding='utf-8') as f:
nbformat.write(nb, f)
print("OK")
Shut down running binder instance.
"""
return await self.run_code(code)

async def upload_local_notebook(self, notebook_filename):
nb = open_nb_and_strip_output(notebook_filename)
# probably want to use basename instead
await self.put_contents(notebook_filename, nb)


def open_nb_and_strip_output(fname):
cop = ClearOutputPreprocessor()
with open(fname) as f:
nb = nbformat.read(f, as_version=4)
cop.preprocess(nb, dict())
return nb
# Ideally, we will talk to the hub API to shut this down
# However, the token we get is just a notebook auth token, *not* a hub auth otken
# So we can't make requests to the hub API.
# FIXME: Provide hub auth tokens from binderhub API
nbclient = NotebookClient(self.session, self.notebook_url, self.token, self.log)
# Don't try to stop the kernel when we are done executin
# We don't expect the notebook server to be around still
async with nbclient.start_kernel(cleanup=False) as kernel:
await kernel.run_code("""
import os
import signal
# FIXME: Wait a bit, and send SIGKILL otherwise
os.kill(1, signal.SIGTERM)
""")
57 changes: 28 additions & 29 deletions binderbot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import nbformat

from .binderbot import BinderUser
from .notebookclient import NotebookClient

# https://github.com/pallets/click/issues/85#issuecomment-43378930
def coro(f):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default binder URL https://binder.pangeo.io at L22 below doesn't work anymore. Point to https://aws-uswest2-binder.pangeo.io instead? Or wait for a new Binder deployment from 2i2c-org/infrastructure#919 🙂

Expand Down Expand Up @@ -56,39 +57,37 @@ async def main(binder_url, repo, ref, output_dir, nb_timeout,
# inputs look good, start up binder
async with BinderUser(binder_url, repo, ref) as jovyan:
await jovyan.start_binder(timeout=binder_start_timeout)
await jovyan.start_kernel()
click.echo(f"✅ Binder and kernel started successfully.")
# could think about asyncifying this whole loop
# for now, we run one notebook at a time to avoid overloading the binder
errors = {}
for fname in filenames:
try:
click.echo(f"⌛️ Uploading {fname}...", nl=False)
await jovyan.upload_local_notebook(fname)
click.echo("✅")
click.echo(f"⌛️ Executing {fname}...", nl=False)
await jovyan.execute_notebook(fname, timeout=nb_timeout,
env_vars=extra_env_vars)
click.echo("✅")
click.echo(f"⌛️ Downloading and saving {fname}...", nl=False)
nb_data = await jovyan.get_contents(fname)
nb = nbformat.from_dict(nb_data)
output_fname = os.path.join(output_dir, fname) if output_dir else fname
with open(output_fname, 'w', encoding='utf-8') as f:
nbformat.write(nb, f)
click.echo("✅")
except Exception as e:
errors[fname] = e
click.echo(f'❌ error running {fname}: {e}')

await jovyan.stop_kernel()
nb = NotebookClient(jovyan.session, jovyan.notebook_url, jovyan.token, jovyan.log)
async with nb.start_kernel() as kernel:
click.echo(f"✅ Binder and kernel started successfully.")
# could think about asyncifying this whole loop
# for now, we run one notebook at a time to avoid overloading the binder
errors = {}
for fname in filenames:
try:
click.echo(f"⌛️ Uploading {fname}...", nl=False)
await nb.upload_local_notebook(fname)
click.echo("✅")
click.echo(f"⌛️ Executing {fname}...", nl=False)
await kernel.execute_notebook(fname, timeout=nb_timeout,
env_vars=extra_env_vars)
click.echo("✅")
click.echo(f"⌛️ Downloading and saving {fname}...", nl=False)
nb_data = await nb.get_contents(fname)
nb = nbformat.from_dict(nb_data)
output_fname = os.path.join(output_dir, fname) if output_dir else fname
with open(output_fname, 'w', encoding='utf-8') as f:
nbformat.write(nb, f)
click.echo("✅")
except Exception as e:
errors[fname] = e
click.echo(f'❌ error running {fname}: {e}')

if len(errors) > 0:
raise RuntimeError(str(errors))

# TODO: shut down binder
# await jovyan.shutdown_binder()
# can we do this with a context manager so that it shuts down in case of errors?
# TODO: can we do this with a context manager so that it shuts down in case of errors?
await jovyan.shutdown_binder()


if __name__ == "__main__":
Expand Down
Loading