diff --git a/pyproject.toml b/pyproject.toml index 0cbad3e..198802d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "solara", "vaex", "anywidget", + "humanfriendly", ] [project.optional-dependencies] diff --git a/src/aiidalab_sssp/components/article.py b/src/aiidalab_sssp/components/banner.py similarity index 100% rename from src/aiidalab_sssp/components/article.py rename to src/aiidalab_sssp/components/banner.py diff --git a/src/aiidalab_sssp/components/computational_resource.py b/src/aiidalab_sssp/components/computational_resource.py new file mode 100644 index 0000000..acdbd40 --- /dev/null +++ b/src/aiidalab_sssp/components/computational_resource.py @@ -0,0 +1,2225 @@ +import copy +import enum +import re +import subprocess +import requests +import threading +from collections import namedtuple +from pathlib import Path +from uuid import UUID +from enum import Enum + +import ipywidgets as ipw +import traitlets as tl +from aiida.plugins import DataFactory + +import jinja2 +import pexpect +import shortuuid +from aiida import common, orm, plugins +from aiida.orm.utils.builders.computer import ComputerBuilder +from aiida.transports.plugins import ssh as aiida_ssh_plugin +from humanfriendly import InvalidSize, parse_size +from IPython.display import clear_output, display +from jinja2 import meta as jinja2_meta + +"""Some utility functions used acrross the repository.""" + +CifData = DataFactory("core.cif") # pylint: disable=invalid-name +StructureData = DataFactory("core.structure") # pylint: disable=invalid-name +TrajectoryData = DataFactory("core.array.trajectory") # pylint: disable=invalid-name + +class _StatusWidgetMixin(tl.HasTraits): + """Show temporary messages for example for status updates. + This is a mixin class that is meant to be part of an inheritance + tree of an actual widget with a 'value' traitlet that is used + to convey a status message. See the non-private classes below + for examples. + """ + + message = tl.Unicode(default_value="", allow_none=True) + new_line = "\n" + + def __init__(self, clear_after=3, *args, **kwargs): + self._clear_timer = None + self._clear_after = clear_after + self._message_stack = [] + super().__init__(*args, **kwargs) + + def _clear_value(self): + """Set widget .value to be an empty string.""" + if self._message_stack: + self._message_stack.pop(0) + self.value = self.new_line.join(self._message_stack) + else: + self.value = "" + + def show_temporary_message(self, value, clear_after=None): + """Show a temporary message and clear it after the given interval.""" + clear_after = clear_after or self._clear_after + if value: + self._message_stack.append(value) + self.value = self.new_line.join(self._message_stack) + + # Start new timer that will clear the value after the specified interval. + self._clear_timer = threading.Timer(self._clear_after, self._clear_value) + self._clear_timer.start() + self.message = None + +class StatusHTML(_StatusWidgetMixin, ipw.HTML): + """Show temporary HTML messages for example for status updates.""" + + new_line = "
" + + # This method should be part of _StatusWidgetMixin, but that does not work + # for an unknown reason. + @tl.observe("message") + def _observe_message(self, change): + self.show_temporary_message(change["new"]) + + +# Define the message levels as Enum +class MessageLevel(Enum): + INFO = "info" + WARNING = "warning" + ERROR = "danger" + SUCCESS = "success" + + +def wrap_message(message, level=MessageLevel.INFO): + """Wrap message into HTML code with the given level.""" + # mapping level to fa icon + # https://fontawesome.com/v4.7.0/icons/ + mapping = { + MessageLevel.INFO: "info-circle", + MessageLevel.WARNING: "exclamation-triangle", + MessageLevel.ERROR: "exclamation-circle", + MessageLevel.SUCCESS: "check-circle", + } + + # The message is wrapped into a div with the class "alert" and the icon of the given level + return f""" + + """ + +STYLE = {"description_width": "180px"} +LAYOUT = {"width": "400px"} + +TEMPLATE_PATTERN = re.compile(r"\{\{.*\}\}") + +class ComputationalResourcesWidget(ipw.VBox): + """Code selection widget. + Attributes: + + value(code UUID): Trait that points to the selected UUID of the code instance. + It can be set either to an AiiDA code UUID or to a code label. + It is linked to the `value` trait of the `self.code_select_dropdown` widget. + + codes(Dict): Trait that contains a dictionary (label => Code UUID) for all + codes found in the AiiDA database for the selected plugin. It is linked + to the 'options' trait of the `self.code_select_dropdown` widget. + + allow_hidden_codes(Bool): Trait that defines whether to show hidden codes or not. + + allow_disabled_computers(Bool): Trait that defines whether to show codes on disabled + computers. + """ + + value = tl.Unicode(allow_none=True) + codes = tl.Dict(allow_none=True) + allow_hidden_codes = tl.Bool(False) + allow_disabled_computers = tl.Bool(False) + + # code output layout width + _output_width = "460px" + + def __init__( + self, + description="Select code:", + enable_quick_setup=True, + enable_detailed_setup=True, + clear_after=None, + default_calc_job_plugin=None, + **kwargs, + ): + """Dropdown for Codes for one input plugin. + + description (str): Description to display before the dropdown. + """ + clear_after = clear_after or 15 + self.default_calc_job_plugin = default_calc_job_plugin + self.output = ipw.HTML() + self.setup_message = StatusHTML(clear_after=clear_after) + self.code_select_dropdown = ipw.Dropdown( + description=description, + disabled=True, + value=None, + style={"description_width": "initial"}, + ) + + # Create bi-directional link between the code_select_dropdown and the codes trait. + tl.directional_link( + (self, "codes"), + (self.code_select_dropdown, "options"), + transform=lambda x: [(key, x[key]) for key in x], + ) + tl.directional_link( + (self.code_select_dropdown, "options"), + (self, "codes"), + transform=lambda x: {c[0]: c[1] for c in x}, + ) + tl.link((self.code_select_dropdown, "value"), (self, "value")) + + self.observe( + self.refresh, names=["allow_disabled_computers", "allow_hidden_codes"] + ) + + self.btn_setup_new_code = ipw.ToggleButton(description="Setup new code") + self.btn_setup_new_code.observe(self._setup_new_code, "value") + + self._setup_new_code_output = ipw.Output(layout={"width": self._output_width}) + + children = [ + ipw.HBox([self.code_select_dropdown, self.btn_setup_new_code]), + self._setup_new_code_output, + self.output, + ] + super().__init__(children=children, **kwargs) + + # Computer/code setup + self.resource_setup = _ResourceSetupBaseWidget( + default_calc_job_plugin=self.default_calc_job_plugin, + enable_quick_setup=enable_quick_setup, + enable_detailed_setup=enable_detailed_setup, + ) + self.resource_setup.observe(self.refresh, "success") + tl.dlink( + (self.resource_setup, "message"), + (self.setup_message, "message"), + ) + + self.refresh() + + def _get_codes(self): + """Query the list of available codes.""" + + user = orm.User.collection.get_default() + filters = ( + {"attributes.input_plugin": self.default_calc_job_plugin} + if self.default_calc_job_plugin + else {} + ) + + return [ + (self._full_code_label(c[0]), c[0].uuid) + for c in orm.QueryBuilder() + .append( + orm.Code, + filters=filters, + ) + .all() + if c[0].computer.is_user_configured(user) + and (self.allow_hidden_codes or not c[0].is_hidden) + and (self.allow_disabled_computers or c[0].computer.is_user_enabled(user)) + ] + + @staticmethod + def _full_code_label(code): + return f"{code.label}@{code.computer.label}" + + def refresh(self, _=None): + """Refresh available codes. + + The job of this function is to look in AiiDA database, find available codes and + put them in the code_select_dropdown widget.""" + self.output.value = "" + + with self.hold_trait_notifications(): + self.code_select_dropdown.options = self._get_codes() + if not self.code_select_dropdown.options: + self.output.value = f"No codes found for default calcjob plugin {self.default_calc_job_plugin!r}." + self.code_select_dropdown.disabled = True + else: + self.code_select_dropdown.disabled = False + self.code_select_dropdown.value = None + + @tl.validate("value") + def _validate_value(self, change): + """Check if the code is valid in DB""" + code_uuid = change["value"] + self.output.value = "" + + # If code None, set value to None. + if code_uuid is None: + return None + + try: + _ = UUID(code_uuid, version=4) + except ValueError: + self.output.value = f"""'{code_uuid}' + is not a valid UUID.""" + else: + return code_uuid + + def _setup_new_code(self, _=None): + with self._setup_new_code_output: + clear_output() + if self.btn_setup_new_code.value: + self._setup_new_code_output.layout = { + "width": self._output_width, + "border": "1px solid gray", + } + + children = [ + self.resource_setup, + self.setup_message, + ] + display(*children) + else: + self._setup_new_code_output.layout = { + "width": self._output_width, + "border": "none", + } + + +class SshConnectionState(enum.Enum): + waiting_for_input = -1 + enter_password = 0 + success = 1 + keys_already_present = 2 + do_you_want_to_continue = 3 + no_keys = 4 + unknown_hostname = 5 + connection_refused = 6 + end_of_file = 7 + + +class SshComputerSetup(ipw.VBox): + """Setup a passwordless access to a computer.""" + + ssh_config = tl.Dict() + ssh_connection_state = tl.UseEnum( + SshConnectionState, allow_none=True, default_value=None + ) + SSH_POSSIBLE_RESPONSES = [ + # order matters! the index will return by pexpect and compare + # with SshConnectionState + "[Pp]assword:", # 0 + "Now try logging into", # 1 + "All keys were skipped because they already exist on the remote system", # 2 + "Are you sure you want to continue connecting (yes/no)?", # 3 + "ERROR: No identities found", # 4 + "Could not resolve hostname", # 5 + "Connection refused", # 6 + pexpect.EOF, # 7 + ] + message = tl.Unicode() + password_message = tl.Unicode("The passwordless enabling log.") + + def __init__(self, ssh_folder: Path | None = None, **kwargs): + """Setup a passwordless access to a computer.""" + # ssh folder init + if ssh_folder is None: + ssh_folder = Path.home() / ".ssh" + + if not ssh_folder.exists(): + ssh_folder.mkdir() + ssh_folder.chmod(0o700) + + self._ssh_folder = ssh_folder + + self._ssh_connection_message = None + self._password_message = ipw.HTML() + tl.dlink( + (self, "password_message"), + (self._password_message, "value"), + transform=lambda x: f"SSH log: {x}", + ) + self._ssh_password = ipw.Password( + description="password:", + disabled=False, + layout=LAYOUT, + style=STYLE, + ) + # Don't show the continue button until it ask for password the + # second time, which happened when the proxy jump is set. The + # first time it ask for password is for the jump host. + self._continue_with_password_button = ipw.Button( + description="Continue", + layout={"width": "100px", "display": "none"}, + ) + self._continue_with_password_button.on_click(self._send_password) + + self.password_box = ipw.VBox( + [ + ipw.HBox([self._ssh_password, self._continue_with_password_button]), + self._password_message, + ] + ) + + # Username. + self.username = ipw.Text(description="Username:", layout=LAYOUT, style=STYLE) + + # Port. + self.port = ipw.IntText( + description="Port:", + value=22, + layout=LAYOUT, + style=STYLE, + ) + + # Hostname. + self.hostname = ipw.Text( + description="Computer hostname:", + layout=LAYOUT, + style=STYLE, + ) + + # ProxyJump. + self.proxy_jump = ipw.Text( + description="ProxyJump:", + layout=LAYOUT, + style=STYLE, + ) + # ProxyJump. + self.proxy_command = ipw.Text( + description="ProxyCommand:", + layout=LAYOUT, + style=STYLE, + ) + + self._inp_private_key = ipw.FileUpload( + accept="", + layout=LAYOUT, + description="Private key", + multiple=False, + ) + self._verification_mode = ipw.Dropdown( + options=[ + ("Password", "password"), + ("Use custom private key", "private_key"), + ("Download public key", "public_key"), + ("Multiple factor authentication", "mfa"), + ], + layout=LAYOUT, + style=STYLE, + value="password", + description="Verification mode:", + disabled=False, + ) + self._verification_mode.observe( + self._on_verification_mode_change, names="value" + ) + self._verification_mode_output = ipw.Output() + + self._continue_button = ipw.ToggleButton( + description="Continue", layout={"width": "100px"}, value=False + ) + + # Setup ssh button and output. + btn_setup_ssh = ipw.Button(description="Setup ssh") + btn_setup_ssh.on_click(self._on_setup_ssh_button_pressed) + + children = [ + self.hostname, + self.port, + self.username, + self.proxy_jump, + self.proxy_command, + self._verification_mode, + self._verification_mode_output, + btn_setup_ssh, + ] + super().__init__(children, **kwargs) + + def _ssh_keygen(self): + """Generate ssh key pair.""" + self.message = wrap_message( + "Generating SSH key pair.", + MessageLevel.SUCCESS, + ) + fpath = self._ssh_folder / "id_rsa" + keygen_cmd = [ + "ssh-keygen", + "-f", + fpath, + "-t", + "rsa", + "-b", + "4096", + "-m", + "PEM", + "-N", + "", + ] + if not fpath.exists(): + subprocess.run(keygen_cmd, capture_output=True) + + def _can_login(self): + """Check if it is possible to login into the remote host.""" + # With BatchMode on, no password prompt or other interaction is attempted, + # so a connect that requires a password will fail. + ret = subprocess.call( + [ + "ssh", + self.hostname.value, + "-o", + "BatchMode=yes", + "-o", + "ConnectTimeout=5", + "true", + ] + ) + return ret == 0 + + def _is_in_config(self): + """Check if the SSH config file contains host information.""" + config_path = self._ssh_folder / "config" + if not config_path.exists(): + return False + sshcfg = aiida_ssh_plugin.parse_sshconfig(self.hostname.value) + # NOTE: parse_sshconfig returns a dict with a hostname + # even if it is not in the config file. + # We require at least the user to be specified. + if "user" not in sshcfg: + return False + return True + + def _write_ssh_config(self, private_key_abs_fname=None): + """Put host information into the config file.""" + config_path = self._ssh_folder / "config" + + self.message = wrap_message( + f"Adding {self.hostname.value} section to {config_path}", + MessageLevel.SUCCESS, + ) + with open(config_path, "a") as file: + file.write(f"Host {self.hostname.value}\n") + file.write(f" User {self.username.value}\n") + file.write(f" Port {self.port.value}\n") + if self.proxy_jump.value != "": + file.write( + f" ProxyJump {self.proxy_jump.value.format(username=self.username.value)}\n" + ) + if self.proxy_command.value != "": + file.write( + f" ProxyCommand {self.proxy_command.value.format(username=self.username.value)}\n" + ) + if private_key_abs_fname: + file.write(f" IdentityFile {private_key_abs_fname}\n") + file.write(" ServerAliveInterval 5\n") + + def key_pair_prepare(self): + """Prepare key pair for the ssh connection.""" + # Always start by generating a key pair if they are not present. + self._ssh_keygen() + + # If hostname & username are not provided - do not do anything. + if self.hostname.value == "": # check hostname + message = "Please specify the computer name (for SSH)" + + raise ValueError(message) + + if self.username.value == "": # check username + message = "Please specify your SSH username." + + raise ValueError(message) + + def thread_ssh_copy_id(self): + """Copy public key on the remote computer, on a separate thread.""" + ssh_connection_thread = threading.Thread(target=self._ssh_copy_id) + ssh_connection_thread.start() + + def _on_setup_ssh_button_pressed(self, _=None): + """Setup ssh connection.""" + if self._verification_mode.value == "password": + try: + self.key_pair_prepare() + + except ValueError as exc: + self.message = wrap_message(str(exc), MessageLevel.ERROR) + return + + self.thread_ssh_copy_id() + + # For not password ssh auth (such as using private_key or 2FA), key pair is not needed (2FA) + # or the key pair is ready. + # There are other mechanism to set up the ssh connection. + # But we still need to write the ssh config to the ssh config file for such as + # proxy jump. + + private_key_fname = None + if self._verification_mode.value == "private_key": + # Write private key in ~/.ssh/ and use the name of upload file, + # if exist, generate random string and append to filename then override current name. + + # unwrap private key file and setting temporary private_key content + private_key_fname, private_key_content = self._private_key + if private_key_fname is None: # check private key file + message = "Please upload your private key file." + self.message = wrap_message(message, MessageLevel.ERROR) + return + + filename = Path(private_key_fname).name + + # if the private key filename is exist, generate random string and append to filename subfix + # then override current name. + if filename in [str(p.name) for p in Path(self._ssh_folder).iterdir()]: + private_key_fpath = self._ssh_folder / f"{filename}-{shortuuid.uuid()}" + + self._add_private_key(private_key_fpath, private_key_content) + + # TODO(danielhollas): I am not sure this is correct. What if the user wants + # to overwrite the private key? Or any other config? The configuration would never be written. + # And the user is not notified that we did not write anything. + # https://github.com/aiidalab/aiidalab-widgets-base/issues/516 + if not self._is_in_config(): + self._write_ssh_config(private_key_abs_fname=private_key_fname) + + def _ssh_copy_id(self): + """Run the ssh-copy-id command and follow it until it is completed.""" + timeout = 10 + self.password_message = f"Sending public key to {self.hostname.value}... " + self._ssh_connection_process = pexpect.spawn( + f"ssh-copy-id {self.hostname.value}" + ) + while True: + try: + idx = self._ssh_connection_process.expect( + self.SSH_POSSIBLE_RESPONSES, + timeout=timeout, + ) + self.ssh_connection_state = SshConnectionState(idx) + except pexpect.TIMEOUT: + self.password_message = f"Exceeded {timeout} s timeout. Please check you username and password and try again." + break + + # Terminating the process when nothing else can be done. + if self.ssh_connection_state in ( + SshConnectionState.success, + SshConnectionState.keys_already_present, + SshConnectionState.no_keys, + SshConnectionState.unknown_hostname, + SshConnectionState.connection_refused, + SshConnectionState.end_of_file, + ): + break + + self._ssh_connection_message = None + self._ssh_connection_process = None + + def _send_password(self, _=None): + self._continue_with_password_button.disabled = True + self._ssh_connection_process.sendline(self._ssh_password.value) + + @tl.observe("ssh_connection_state") + def _observe_ssh_connnection_state(self, _=None): + """Observe the ssh connection state and act according to the changes.""" + if self.ssh_connection_state is SshConnectionState.waiting_for_input: + return + if self.ssh_connection_state is SshConnectionState.success: + self.password_message = ( + "The passwordless connection has been set up successfully." + ) + return + if self.ssh_connection_state is SshConnectionState.keys_already_present: + self.password_message = "The passwordless connection has already been setup. Nothing to be done." + return + if self.ssh_connection_state is SshConnectionState.no_keys: + self.password_message = ( + " Failed\nLooks like the key pair is not present in ~/.ssh folder." + ) + return + if self.ssh_connection_state is SshConnectionState.unknown_hostname: + self.password_message = "Failed\nUnknown hostname." + return + if self.ssh_connection_state is SshConnectionState.connection_refused: + self.password_message = "Failed\nConnection refused." + return + if self.ssh_connection_state is SshConnectionState.end_of_file: + self.password_message = ( + "Didn't manage to connect. Please check your username/password." + ) + return + if self.ssh_connection_state is SshConnectionState.enter_password: + self._handle_ssh_password() + elif self.ssh_connection_state is SshConnectionState.do_you_want_to_continue: + self._ssh_connection_process.sendline("yes") + + def _handle_ssh_password(self): + """Send a password to a remote computer.""" + message = ( + self._ssh_connection_process.before.splitlines()[-1] + + self._ssh_connection_process.after + ) + if self._ssh_connection_message == message: + self._ssh_connection_process.sendline(self._ssh_password.value) + else: + self.password_message = ( + f"Please enter {self.username.value}@{self.hostname.value}'s password:" + if message == b"Password:" + else f"Please enter {message.decode('utf-8')}" + ) + self._ssh_password.disabled = False + self._continue_with_password_button.disabled = False + self._ssh_connection_message = message + + # If user did not provide a password, we wait for the input. + # Otherwise, we send the password. + if self._ssh_password.value == "": + self.ssh_connection_state = SshConnectionState.waiting_for_input + else: + self.password_message = 'Sending password (timeout 10s)' + self._send_password() + + def _on_verification_mode_change(self, change): + """which verification mode is chosen.""" + with self._verification_mode_output: + clear_output() + if self._verification_mode.value == "private_key": + display(self._inp_private_key) + elif self._verification_mode.value == "public_key": + public_key = self._ssh_folder / "id_rsa.pub" + if public_key.exists(): + display( + ipw.HTML( + f"""
{public_key.read_text()}
""", + layout={"width": "100%"}, + ) + ) + + @property + def _private_key(self) -> tuple[str | None, bytes | None]: + """Unwrap private key file and setting filename and file content.""" + if self._inp_private_key.value: + (fname, _value), *_ = self._inp_private_key.value.items() + content = copy.copy(_value["content"]) + self._inp_private_key.value.clear() + self._inp_private_key._counter = 0 # pylint: disable=protected-access + return fname, content + return None, None + + @staticmethod + def _add_private_key(private_key_fpath: Path, private_key_content: bytes): + """Write private key to the private key file in the ssh folder.""" + private_key_fpath.write_bytes(private_key_content) + private_key_fpath.chmod(0o600) + + def _reset(self): + self.hostname.value = "" + self.port.value = 22 + self.username.value = "" + self.proxy_jump.value = "" + self.proxy_command.value = "" + + @tl.observe("ssh_config") + def _observe_ssh_config(self, change): + """Pre-filling the input fields.""" + self._reset() + + new_ssh_config = change["new"] + if "hostname" in new_ssh_config: + self.hostname.value = new_ssh_config["hostname"] + if "username" in new_ssh_config: + self.username.value = new_ssh_config["username"] + if "port" in new_ssh_config: + self.port.value = int(new_ssh_config["port"]) + if "proxy_jump" in new_ssh_config: + self.proxy_jump.value = new_ssh_config["proxy_jump"] + if "proxy_command" in new_ssh_config: + self.proxy_command.value = new_ssh_config["proxy_command"] + + +class AiidaComputerSetup(ipw.VBox): + """Inform AiiDA about a computer.""" + + computer_setup = tl.Dict(allow_none=True) + computer_configure = tl.Dict(allow_none=True) + message = tl.Unicode() + + def __init__(self, **kwargs): + self._on_setup_computer_success = [] + + # List of widgets to be displayed. + self.label = ipw.Text( + value="", + placeholder="Will only be used within AiiDA", + description="Computer name:", + layout=LAYOUT, + style=STYLE, + ) + + # Hostname. + self.hostname = ipw.Text(description="Hostname:", layout=LAYOUT, style=STYLE) + + # Computer description. + self.description = ipw.Text( + value="", + placeholder="No description (yet)", + description="Computer description:", + layout=LAYOUT, + style=STYLE, + ) + + # Directory where to run the simulations. + self.work_dir = ipw.Text( + value="", + placeholder="/home/{username}/aiida_run", + description="AiiDA working directory:", + layout=LAYOUT, + style=STYLE, + ) + + # Mpirun command. + self.mpirun_command = ipw.Text( + value="mpirun -n {tot_num_mpiprocs}", + description="Mpirun command:", + layout=LAYOUT, + style=STYLE, + ) + + # Number of CPUs per node. + self.mpiprocs_per_machine = ipw.IntText( + value=1, + step=1, + description="#CPU(s) per node:", + layout=LAYOUT, + style=STYLE, + ) + + # Memory per node. + self.default_memory_per_machine_widget = ipw.Text( + value="", + placeholder="not specified", + description="Memory per node:", + layout=LAYOUT, + style=STYLE, + ) + memory_wrong_syntax = ipw.HTML( + value=""" wrong syntax""", + ) + memory_wrong_syntax.layout.display = "none" + self.default_memory_per_machine = None + + def observe_memory_per_machine(change): + """Check if the string defining memory is valid.""" + memory_wrong_syntax.layout.display = "none" + if not self.default_memory_per_machine_widget.value: + self.default_memory_per_machine = None + return + try: + self.default_memory_per_machine = ( + int(parse_size(change["new"], binary=True) / 1024) or None + ) + memory_wrong_syntax.layout.display = "none" + except InvalidSize: + memory_wrong_syntax.layout.display = "block" + self.default_memory_per_machine = None + + self.default_memory_per_machine_widget.observe( + observe_memory_per_machine, names="value" + ) + + # Transport type. + self.transport = ipw.Dropdown( + value="core.local", + options=plugins.entry_point.get_entry_point_names("aiida.transports"), + description="Transport type:", + style=STYLE, + ) + + # Safe interval. + self.safe_interval = ipw.FloatText( + value=30.0, + description="Min. connection interval (sec):", + layout=LAYOUT, + style=STYLE, + ) + + # Scheduler. + self.scheduler = ipw.Dropdown( + value="core.slurm", + options=plugins.entry_point.get_entry_point_names("aiida.schedulers"), + description="Scheduler:", + style=STYLE, + ) + + self.shebang = ipw.Text( + value="#!/usr/bin/env bash", + description="Shebang:", + layout=LAYOUT, + style=STYLE, + ) + + # Use double quotes to escape. + self.use_double_quotes = ipw.Checkbox( + value=False, + description="Use double quotes to escape environment variable of job script.", + ) + + # Use login shell. + self.use_login_shell = ipw.Checkbox(value=True, description="Use login shell") + + # Prepend text. + self.prepend_text = ipw.Textarea( + placeholder="Text to prepend to each command execution", + description="Prepend text:", + layout=LAYOUT, + ) + + # Append text. + self.append_text = ipw.Textarea( + placeholder="Text to append to each command execution", + description="Append text:", + layout=LAYOUT, + ) + + # Buttons and outputs. + self.setup_button = ipw.Button(description="Setup computer") + self.setup_button.on_click(self.on_setup_computer) + test_button = ipw.Button(description="Test computer") + test_button.on_click(self.test) + self._test_out = ipw.HTML(layout=LAYOUT) + + # Organize the widgets + children = [ + self.label, + self.hostname, + self.description, + self.work_dir, + self.mpirun_command, + self.mpiprocs_per_machine, + ipw.HBox([self.default_memory_per_machine_widget, memory_wrong_syntax]), + self.transport, + self.safe_interval, + self.scheduler, + self.shebang, + self.use_login_shell, + self.use_double_quotes, + self.prepend_text, + self.append_text, + self.setup_button, + test_button, + self._test_out, + ] + + super().__init__(children, **kwargs) + + def _configure_computer(self, computer: orm.Computer, transport: str): + # Use default AiiDA user + user = orm.User.collection.get_default() + if transport == "core.ssh": + self._configure_computer_ssh(computer, user) + elif transport == "core.local": + self._configure_computer_local(computer, user) + else: + msg = f"invalid transport type '{transport}'" + raise common.ValidationError(msg) + + def _configure_computer_ssh(self, computer: orm.Computer, user: orm.User): + """Configure the computer with SSH transport + + There are three sources of authparams information ordered by priority increase: + - the default values + - the SSH config file + - the computer_configure dictionary + + At the moment, there is no overlap between the SSH config file and the computer_configure dictionary. + + The proxyjump and proxycommend can be read from both the SSH config file and the computer_configure dictionary, + since the SSH config file is generated from the computer_configure dictionary in `SshComputerSetup._write_ssh_config`. + """ + sshcfg = aiida_ssh_plugin.parse_sshconfig(self.hostname.value) + authparams = { + "port": int(sshcfg.get("port", 22)), + "look_for_keys": True, + "key_filename": str( + Path(sshcfg.get("identityfile", ["~/.ssh/id_rsa"])[0]).expanduser() + ), + "timeout": 60, + "allow_agent": True, + "proxy_jump": "", + "proxy_command": "", + "compress": True, + "gss_auth": False, + "gss_kex": False, + "gss_deleg_creds": False, + "gss_host": self.hostname.value, + "load_system_host_keys": True, + "key_policy": "WarningPolicy", + "use_login_shell": self.use_login_shell.value, + "safe_interval": self.safe_interval.value, + } + if "username" in self.computer_configure: + authparams["username"] = self.computer_configure["username"] + else: + try: + # This require the Ssh connection setup is done before the computer setup + authparams["username"] = sshcfg["user"] + except KeyError as exc: + message = "SSH username is not provided" + raise RuntimeError(message) from exc + + if "proxy_jump" in self.computer_configure: + authparams["proxy_jump"] = self.computer_configure["proxy_jump"] + + if "proxy_command" in self.computer_configure: + authparams["proxy_command"] = self.computer_configure["proxy_command"] + + if "key_filename" in self.computer_configure: + authparams["key_filename"] = str( + Path(self.computer_configure["key_filename"]).expanduser() + ) + + if "key_policy" in self.computer_configure: + authparams["key_policy"] = self.computer_configure["key_policy"] + + computer.configure(user=user, **authparams) + return True + + def _configure_computer_local(self, computer: orm.Computer, user: orm.User): + """Configure the computer with local transport""" + authparams = { + "use_login_shell": self.use_login_shell.value, + "safe_interval": self.safe_interval.value, + } + computer.configure(user=user, **authparams) + return True + + def _run_callbacks_if_computer_exists(self, label): + """Run things on an existing computer""" + if self._computer_exists(label): + for function in self._on_setup_computer_success: + function() + return True + return False + + def _computer_exists(self, label): + try: + orm.load_computer(label=label) + except common.NotExistent: + return False + return True + + def _validate_computer_settings(self): + if self.label.value == "": # check computer label + self.message = wrap_message( + "Please specify the computer name (for AiiDA)", + MessageLevel.WARNING, + ) + return False + + if self.work_dir.value == "": + self.message = wrap_message( + "Please specify working directory", + MessageLevel.WARNING, + ) + return False + + if self.hostname.value == "": + self.message = wrap_message( + "Please specify hostname", + MessageLevel.WARNING, + ) + return False + + return True + + def on_setup_computer(self, _=None): + """Create a new computer.""" + if not self._validate_computer_settings(): + return False + + # If the computer already exists, we just run the registered functions and return + if self._run_callbacks_if_computer_exists(self.label.value): + self.message = wrap_message( + f"A computer {self.label.value} already exists. Skipping creation.", + MessageLevel.INFO, + ) + return True + + items_to_configure = [ + "label", + "hostname", + "description", + "work_dir", + "mpirun_command", + "mpiprocs_per_machine", + "transport", + "use_double_quotes", + "scheduler", + "prepend_text", + "append_text", + "shebang", + ] + + kwargs = {key: getattr(self, key).value for key in items_to_configure} + + computer_builder = ComputerBuilder( + default_memory_per_machine=self.default_memory_per_machine, **kwargs + ) + try: + computer = computer_builder.new() + self._configure_computer(computer, self.transport.value) + computer.store() + except ( + ComputerBuilder.ComputerValidationError, + common.exceptions.ValidationError, + RuntimeError, + ) as err: + self.message = wrap_message( + f"Computer setup failed! {type(err).__name__}: {err}", + MessageLevel.ERROR, + ) + return False + + # Callbacks will not run if the computer is not stored + if self._run_callbacks_if_computer_exists(self.label.value): + self.message = wrap_message( + f"Computer<{computer.pk}> {computer.label} created", + MessageLevel.SUCCESS, + ) + return True + + self.message = wrap_message( + f"Failed to create computer {computer.label}", + MessageLevel.ERROR, + ) + return False + + def on_setup_computer_success(self, function): + self._on_setup_computer_success.append(function) + + def test(self, _=None): + if self.label.value == "": + self._test_out.value = "Please specify the computer name (for AiiDA)." + return False + elif not self._computer_exists(self.label.value): + self._test_out.value = ( + f"A computer called {self.label.value} does not exist." + ) + return False + + self._test_out.value = '' + process_result = subprocess.run( + ["verdi", "computer", "test", "--print-traceback", self.label.value], + capture_output=True, + ) + + if process_result.returncode == 0: + self._test_out.value = process_result.stdout.decode("utf-8").replace( + "\n", "
" + ) + return True + else: + self._test_out.value = process_result.stderr.decode("utf-8").replace( + "\n", "
" + ) + return False + + def _reset(self): + self.label.value = "" + self.hostname.value = "" + self.description.value = "" + self.work_dir.value = "" + self.mpirun_command.value = "mpirun -n {tot_num_mpiprocs}" + self.default_memory_per_machine_widget.value = "" + self.mpiprocs_per_machine.value = 1 + self.transport.value = "core.ssh" + self.safe_interval.value = 30.0 + self.scheduler.value = "core.slurm" + self.shebang.value = "#!/usr/bin/env bash" + self.use_login_shell.value = True + self.use_double_quotes.value = False + self.prepend_text.value = "" + self.append_text.value = "" + + @tl.observe("computer_setup") + def _observe_computer_setup(self, _=None): + for key, value in self.computer_setup.items(): + if key == "default_memory_per_machine": + self.default_memory_per_machine_widget.value = f"{value} KB" + elif hasattr(self, key): + getattr(self, key).value = value + + @tl.observe("computer_configure") + def _observe_computer_configure(self, _=None): + for key, value in self.computer_configure.items(): + if hasattr(self, key): + getattr(self, key).value = value + + @tl.observe("computer_setup", "computer_configure") + def _observe_computer_setup_and_configure(self, _=None): + if not self.some_values_provided: + self._reset() + + @property + def some_values_provided(self): + return any((self.computer_setup, self.computer_configure)) + + @property + def all_values_provided(self): + return all((self.computer_setup, self.computer_configure)) + + +class AiidaCodeSetup(ipw.VBox): + """Class that allows to setup AiiDA code""" + + code_setup = tl.Dict(allow_none=True) + message = tl.Unicode() + + def __init__(self, **kwargs): + self._on_setup_code_success = [] + + # Code label. + self.label = ipw.Text( + description="AiiDA code label:", + layout=LAYOUT, + style=STYLE, + ) + + # Computer on which the code is installed. The value of this widget is + # the UUID of the selected computer. + self.computer = ComputerDropdownWidget() + + # Code plugin. + self.default_calc_job_plugin = ipw.Dropdown( + options=sorted( + (ep.name, ep.name) + for ep in plugins.entry_point.get_entry_points("aiida.calculations") + ), + description="Code plugin:", + layout=LAYOUT, + style=STYLE, + ) + + # Code description. + self.description = ipw.Text( + placeholder="No description (yet)", + description="Code description:", + layout=LAYOUT, + style=STYLE, + ) + + self.filepath_executable = ipw.Text( + placeholder="/path/to/executable", + description="Absolute path to executable:", + layout=LAYOUT, + style=STYLE, + ) + + # Use double quotes to escape. + self.use_double_quotes = ipw.Checkbox( + value=False, + description="Use double quotes to escape environment variable of job script.", + ) + + self.prepend_text = ipw.Textarea( + placeholder="Text to prepend to each command execution", + description="Prepend text:", + layout=LAYOUT, + ) + + self.append_text = ipw.Textarea( + placeholder="Text to append to each command execution", + description="Append text:", + layout=LAYOUT, + ) + + btn_setup_code = ipw.Button(description="Setup code") + btn_setup_code.on_click(self.on_setup_code) + self.setup_code_out = ipw.Output() + + children = [ + self.label, + self.computer, + self.default_calc_job_plugin, + self.description, + self.filepath_executable, + self.use_double_quotes, + self.prepend_text, + self.append_text, + btn_setup_code, + self.setup_code_out, + ] + super().__init__(children, **kwargs) + + @tl.validate("default_calc_job_plugin") + def _validate_default_calc_job_plugin(self, proposal): + plugin = proposal["value"] + return plugin if plugin in self.default_calc_job_plugin.options else None + + def on_setup_code(self, _=None): + """Setup an AiiDA code.""" + with self.setup_code_out: + clear_output() + + if not self.computer.value: + self.message = wrap_message( + "Please select an existing computer.", + MessageLevel.WARNING, + ) + return False + + items_to_configure = [ + "label", + "description", + "default_calc_job_plugin", + "filepath_executable", + "use_double_quotes", + "prepend_text", + "append_text", + ] + + kwargs = {key: getattr(self, key).value for key in items_to_configure} + + # set computer from its widget value the UUID of the computer. + computer = orm.load_computer(self.computer.value) + + # Checking if the code with this name already exists + qb = orm.QueryBuilder() + qb.append(orm.Computer, filters={"uuid": computer.uuid}, tag="computer") + qb.append( + orm.InstalledCode, + with_computer="computer", + filters={"label": kwargs["label"]}, + ) + if qb.count() > 0: + self.message = wrap_message( + f"Code {kwargs['label']}@{computer.label} already exists, skipping creation.", + MessageLevel.INFO, + ) + # TODO: (unkcpz) as callback function this return value not actually used + # meanwhile if the code already exists we still want to regard it as success (TBD). + # and the `_on_setup_code_success` callback functions will run. + # The return will break the flow and handle back the control to the caller. + # The proper way is to not return but raise an exception and handle it in the caller function + # for the afterward process. + return False + + try: + code = orm.InstalledCode(computer=computer, **kwargs) + except (common.exceptions.InputValidationError, KeyError) as exception: + self.message = wrap_message( + f"Invalid input for code creation: {exception}", + MessageLevel.ERROR, + ) + return False + + try: + code.store() + code.is_hidden = False + except common.exceptions.ValidationError as exception: + self.message = wrap_message( + f"Unable to store the code: {exception}", + MessageLevel.ERROR, + ) + return False + + for function in self._on_setup_code_success: + function() + + self.message = wrap_message( + f"Code<{code.pk}> {code.full_label} created", + MessageLevel.SUCCESS, + ) + + return True + + def on_setup_code_success(self, function): + self._on_setup_code_success.append(function) + + def _reset(self): + self.label.value = "" + self.computer.value = "" + self.description.value = "" + self.filepath_executable.value = "" + self.use_double_quotes.value = False + self.prepend_text.value = "" + self.append_text.value = "" + + def refresh(self): + self._observe_code_setup() + + @tl.observe("code_setup") + def _observe_code_setup(self, _=None): + # Setup. + self.computer.refresh() + if not self.code_setup: + self._reset() + for key, value in self.code_setup.items(): + if hasattr(self, key): + if key == "default_calc_job_plugin": + try: + self.default_calc_job_plugin.value = value + except tl.TraitError: + # If is a template then don't raise the error message. + if not re.match(r".*{{.+}}.*", value): + self.message = wrap_message( + f"Input plugin {value} is not installed.", + MessageLevel.WARNING, + ) + elif key == "computer": + # check if the computer is set by load the label. + # if the computer not set put the value to None as placeholder for + # ComputerDropdownWidget it will refresh after the computer set up. + # if the computer is set pass the UUID to ComputerDropdownWdiget. + try: + computer = orm.load_computer(value) + except common.NotExistent: + getattr(self, key).value = None + else: + getattr(self, key).value = computer.uuid + else: + getattr(self, key).value = value + + +class ComputerDropdownWidget(ipw.VBox): + """Widget to select a configured computer. + + Attributes: + value(computer UUID): Trait that points to the selected Computer instance. It can be set to an AiiDA Computer UUID. It is linked to the 'value' trait of `self._dropdown` widget. + computers(Dict): Trait that contains a dictionary (label => Computer UUID) for all computers found in the AiiDA database. It is linked to the 'options' trait of `self._dropdown` widget. + allow_select_disabled(Bool): Trait that defines whether to show disabled computers. + """ + + value = tl.Unicode(allow_none=True) + computers = tl.Dict(allow_none=True) + allow_select_disabled = tl.Bool(False) + + def __init__(self, description="Select computer:", **kwargs): + """Dropdown for configured AiiDA Computers. + + description (str): Text to display before dropdown. + """ + + self.output = ipw.HTML() + self._dropdown = ipw.Dropdown( + value=None, + description=description, + style=STYLE, + layout=LAYOUT, + disabled=True, + ) + tl.directional_link( + (self, "computers"), + (self._dropdown, "options"), + transform=lambda x: [(key, x[key]) for key in x], + ) + tl.directional_link( + (self._dropdown, "options"), + (self, "computers"), + transform=lambda x: {c[0]: c[1] for c in x}, + ) + tl.link((self._dropdown, "value"), (self, "value")) + + self.observe(self.refresh, names="allow_select_disabled") + + children = [ + ipw.HBox( + [ + self._dropdown, + ] + ), + self.output, + ] + self.refresh() + super().__init__(children=children, **kwargs) + + def _get_computers(self) -> list: + """Get the list of available computers.""" + + # Getting the current user. + user = orm.User.collection.get_default() + + return [ + (c[0].label, c[0].uuid) + for c in orm.QueryBuilder().append(orm.Computer).all() + if c[0].is_user_configured(user) + and (self.allow_select_disabled or c[0].is_user_enabled(user)) + ] + + def refresh(self, _=None): + """Refresh the list of configured computers.""" + self.output.value = "" + with self.hold_trait_notifications(): + self._dropdown.options = self._get_computers() + if not self.computers: + self.output.value = "No computers found." + self._dropdown.disabled = True + else: + self._dropdown.disabled = False + + self._dropdown.value = None + + @tl.validate("value") + def _validate_value(self, change): + """Select computer by computer UUID.""" + computer_uuid = change["value"] + self.output.value = "" + if not computer_uuid: + return None + + try: + _ = UUID(computer_uuid, version=4) + except ValueError: + self.output.value = f"""'{computer_uuid}' + is not a valid UUID.""" + else: + return computer_uuid + + def select_by_label(self, label): + """Select computer by computer label.""" + self.output.value = "" + if not label: + return None + + try: + computer_uuid = self.computers[label] + except KeyError: + self.output.value = f"""'{label}' + can not find this computer.""" + else: + self.value = computer_uuid + + +TemplateVariableLine = namedtuple("TemplateVariableLine", ["key", "str", "vars"]) + +TemplateVariable = namedtuple("TemplateVariable", ["widget", "lines"]) + + +class TemplateVariablesWidget(ipw.VBox): + # The input template is a dictionary of keyname and template string. + templates = tl.Dict(allow_none=True) + + # The output template is a dictionary of keyname and filled string. + filled_templates = tl.Dict(allow_none=True) + + # the output metadata is a dictionary of keyname and metadata of this template. + metadata = tl.Dict(allow_none=True) + + def __init__(self): + # A placeholder for the template variables widget. + self.template_variables = ipw.VBox() + + # A dictionary of mapping variables. + # the key is the variable name, and the value is a tuple of (template value and widget). + self._template_variables = {} + self._help_text = ipw.HTML( + """
Please fill the template variables below.
""" + ) + self._help_text.layout.display = "none" + + # unfilled_variables is a list of variables that are not filled. + self.unfilled_variables = [] + + super().__init__( + children=[ + self._help_text, + self.template_variables, + ] + ) + + def reset(self): + """Reset the widget.""" + self.templates = {} + self.filled_templates = {} + self._template_variables = {} + self._help_text.layout.display = "none" + self.template_variables.children = [] + + @tl.observe("templates") + def _templates_changed(self, _=None): + """Render the template variables widget.""" + # reset traits and then render the widget. + self._template_variables = {} + self.filled_templates = {} + self._help_text.layout.display = "none" + + self._render() + + self.metadata = self.templates.get("metadata", {}) + + # Update the output filled template. + # After `self._render` all the widgets are created. + # We can use the widget value to fill the template even not all the widgets are filled. + self.fill() + + def _render(self): + """Render the template variables widget.""" + metadata = self.templates.get("metadata", {}) + tooltip = metadata.get("tooltip", None) + + if tooltip: + self._help_text.value = f"""
{tooltip}
""" + + for line_key, line_str in self.templates.items(): + env = jinja2.Environment() + parsed_content = env.parse(line_str) + + # vars is a set of variables in the template + line_vars = jinja2_meta.find_undeclared_variables(parsed_content) + + # Create a widget for each variable. + # The var is the name in a template string + var_metadata = metadata.get("template_variables", {}) + for var in line_vars: + # one var can be used in multiple templates, so we need to keep track of the mapping with the set of variables. + var_meta = var_metadata.get(var, {}) + if var in self._template_variables: + # use the same widget for the same variable. + temp_var = self._template_variables[var] + w = temp_var.widget + lines = temp_var.lines + lines.append(TemplateVariableLine(line_key, line_str, line_vars)) + template_var = TemplateVariable(w, lines) + + self._template_variables[var] = template_var + else: + # create a new widget for the variable. + description = var_meta.get("key_display", f"{var}") + widget_type = var_meta.get("type", "text") + if widget_type == "text": + w = ipw.Text( + description=f"{description}:", + value=var_meta.get("default", ""), + # delay notifying the observers until the user stops typing + continuous_update=False, + layout=LAYOUT, + style=STYLE, + ) + elif widget_type == "list": + w = ipw.Dropdown( + description=f"{description}:", + options=var_meta.get("options", ()), + value=var_meta.get("default", None), + layout=LAYOUT, + style=STYLE, + ) + else: + raise ValueError(f"Invalid widget type {widget_type}") + + # Every time the value of the widget changes, we update the filled template. + # This migth be too much to sync the final filled template every time. + w.observe(self._on_template_variable_filled, names="value") + + template_var = TemplateVariable( + w, [TemplateVariableLine(line_key, line_str, line_vars)] + ) + self._template_variables[var] = template_var + + # Render by change the VBox children of placeholder. + self.template_variables.children = [ + # widget is shared so we only need to get the first one. + template_var.widget + for template_var in self._template_variables.values() + ] + + # Show the help text if there are template variables. + if self.template_variables.children: + self._help_text.layout.display = "block" + + def fill(self): + """Use template and current widgets value to fill the template + and update the filled_template. + """ + self.unfilled_variables = [] + + # Remove the metadata from the templates because for the final + # filled template we don't pass it to setup traits. + filled_templates = copy.deepcopy(self.templates) + if "metadata" in filled_templates: + del filled_templates["metadata"] + + for template_var in self._template_variables.values(): + for line in template_var.lines: + # update the filled template. + # if the widget is not filled, use the original template var string e.g. {{ var }} + inp_dict = {} + for _var in line.vars: + if self._template_variables[_var].widget.value == "": + variable_key = self._template_variables[_var].widget.description + self.unfilled_variables.append(variable_key.strip(":")) + inp_dict[_var] = "{{ " + _var + " }}" + else: + inp_dict[_var] = self._template_variables[_var].widget.value + + # re-render the template + env = jinja2.Environment() + filled_str = env.from_string(line.str).render(**inp_dict) + + # Update the filled template. + filled_templates[line.key] = filled_str + + # assign back to trigger the trait change. + self.filled_templates = filled_templates + + def _on_template_variable_filled(self, _): + """Callback when a template variable is filled.""" + self.fill() + + +class _ResourceSetupBaseWidget(ipw.VBox): + """The widget that allows to setup a computer and code. + This is the building block of the `ComputationalResourcesDatabaseWidget` which + will be directly used by the user. + """ + + success = tl.Bool(False) + message = tl.Unicode() + + ssh_auth = None # store the ssh auth type. Can be "password" or "2FA" + + def __init__( + self, + default_calc_job_plugin=None, + enable_quick_setup=True, + enable_detailed_setup=True, + ): + if not any((enable_detailed_setup, enable_quick_setup)): + raise ValueError( # noqa + "At least one of `enable_quick_setup` and `enable_detailed_setup` should be True." + ) + + self.enable_quick_setup = enable_quick_setup + self.enable_detailed_setup = enable_detailed_setup + + self.quick_setup_button = ipw.Button( + description="Quick setup", + tooltip="Setup a computer and code in one click.", + icon="rocket", + button_style="success", + ) + self.quick_setup_button.on_click(self._on_quick_setup) + + reset_button = ipw.Button( + description="Reset", + tooltip="Reset the resource.", + icon="refresh", + button_style="primary", + ) + reset_button.on_click(self._on_reset) + + # Resource database for setup computer/code. + + self.comp_resources_database = ComputationalResourcesDatabaseWidget( + default_calc_job_plugin=default_calc_job_plugin, + show_reset_button=False, + ) + + # All templates + self.template_computer_setup = TemplateVariablesWidget() + self.template_computer_configure = TemplateVariablesWidget() + self.template_code = TemplateVariablesWidget() + + # SSH + self.ssh_computer_setup = SshComputerSetup() + tl.dlink( + (self.ssh_computer_setup, "message"), + (self, "message"), + ) + + # Computer setup. + self.aiida_computer_setup = AiidaComputerSetup() + tl.dlink( + (self.aiida_computer_setup, "message"), + (self, "message"), + ) + + # Code setup. + self.aiida_code_setup = AiidaCodeSetup() + tl.dlink( + (self.aiida_code_setup, "message"), + (self, "message"), + ) + + # Data flow. + + # Computer template -> SSH connection setup. + tl.dlink( + (self.comp_resources_database, "computer_configure"), + (self.template_computer_configure, "templates"), + ) + tl.dlink( + (self.template_computer_configure, "filled_templates"), + (self.ssh_computer_setup, "ssh_config"), + ) + + # Computer template -> Computer setup. + tl.dlink( + (self.comp_resources_database, "computer_setup"), + (self.template_computer_setup, "templates"), + ) + tl.dlink( + (self.template_computer_setup, "filled_templates"), + (self.aiida_computer_setup, "computer_setup"), + ) + + # Computer template -> Computer configure. + tl.dlink( + (self.template_computer_configure, "filled_templates"), + (self.aiida_computer_setup, "computer_configure"), + ) + self.template_computer_configure.observe( + self._on_template_computer_configure_metadata_change, "metadata" + ) + self.aiida_computer_setup.on_setup_computer_success( + self._on_setup_computer_success + ) + + # Code template -> Code setup. + tl.dlink( + (self.comp_resources_database, "code_setup"), + (self.template_code, "templates"), + ) + tl.dlink( + (self.template_code, "filled_templates"), + (self.aiida_code_setup, "code_setup"), + ) + self.aiida_code_setup.on_setup_code_success(self._on_setup_code_success) + + # The widget for the detailed setup. + description_toggle_detail_setup = ipw.HTML( + """
Tick checkbox to setup resource step by step.
""" + ) + self.toggle_detail_setup = ipw.Checkbox( + description="", + value=False, + indent=False, + # this narrow the checkbox to the minimum width. + layout=ipw.Layout(width="30px"), + ) + self.toggle_detail_setup.observe(self._on_toggle_detail_setup, names="value") + self.detailed_setup_switch_widgets = ipw.HBox( + children=[ + self.toggle_detail_setup, + description_toggle_detail_setup, + ], + layout=ipw.Layout(justify_content="flex-end"), + ) + + detailed_setup_description_text = """
+ Go through the steps to setup SSH connection to remote machine, computer, and code into database.
+ The SSH connection step can be skipped and setup afterwards.
+
+
""" + description = ipw.HTML(detailed_setup_description_text) + + detailed_setup = ipw.Tab( + children=[ + self.ssh_computer_setup, + self.aiida_computer_setup, + self.aiida_code_setup, + ], + ) + detailed_setup.set_title(0, "SSH connection") + detailed_setup.set_title(1, "Computer") + detailed_setup.set_title(2, "Code") + + self.detailed_setup_widget = ipw.VBox( + children=[ + description, + detailed_setup, + ] + ) + + super().__init__( + children=[ + ipw.HTML( + """
Please select the computer/code from a database to pre-fill the fields below.
+ """ + ), + self.comp_resources_database, + self.template_computer_setup, + self.template_code, + self.template_computer_configure, + self.ssh_computer_setup.password_box, + self.detailed_setup_switch_widgets, + ipw.HBox( + children=[ + self.quick_setup_button, + reset_button, + ], + layout=ipw.Layout(justify_content="flex-end"), + ), + self.detailed_setup_widget, + ], + ) + + # update the layout + self._update_layout() + + def _update_layout(self): + """Update the layout to hide or show the bundled quick_setup/detailed_setup.""" + # check if the password is asked for ssh connection. + if self.ssh_auth != "password": + self.ssh_computer_setup.password_box.layout.display = "none" + else: + self.ssh_computer_setup.password_box.layout.display = "block" + + # If both quick and detailed setup are enabled + # - show the switch widget + # - show the quick setup widget (database + template variables + poppud password box + quick setup button) + # - hide the detailed setup widget as default + if self.enable_detailed_setup and self.enable_quick_setup: + self.detailed_setup_widget.layout.display = "none" + return + + # If only quick setup is enabled + # - hide the switch widget + # - hide the detailed setup widget + if self.enable_quick_setup: + self.detailed_setup_switch_widgets.layout.display = "none" + self.detailed_setup_widget.layout.display = "none" + return + + # If only detailed setup is enabled + # - hide the switch widget + # - hide the quick setup widget + # - hide the database widget + # which means only the detailed setup widget is shown. + if self.enable_detailed_setup: + self.children = [ + self.detailed_setup_widget, + ] + + def _on_toggle_detail_setup(self, change): + """When the checkbox is toggled, show/hide the detailed setup.""" + if change["new"]: + self.detailed_setup_widget.layout.display = "block" + self.quick_setup_button.disabled = True + # fill the template variables with the default values or the filled values. + # If the template variables are not all filled raise a warning. + else: + self.detailed_setup_widget.layout.display = "none" + self.quick_setup_button.disabled = False + + def _on_template_computer_configure_metadata_change(self, change): + """callback when the metadata of computer_configure template is changed.""" + if change["new"] is None: + return + + # decide whether to show the ssh password box widget. + # Since for 2FA ssh credential, the password are not needed but set from + # independent mechanism. + self.ssh_auth = change["new"].get("ssh_auth", None) + if self.ssh_auth is None: + self.ssh_auth = "password" + + self._update_layout() + + def _on_reset(self, _=None): + """Reset the database and the widget.""" + with self.hold_trait_notifications(): + self.reset() + self.comp_resources_database.reset() + + def _on_quick_setup(self, _=None): + """Go through all the setup steps automatically.""" + # Raise error if the computer is not selected. + if not self.aiida_computer_setup.some_values_provided: + self.message = wrap_message( + "Please select a computer from the database.", + MessageLevel.ERROR, + ) + return + + # Check if all the template variables are filled. + # If not raise a warning and return (skip the setup). + if ( + unfilled_variables := self.template_computer_setup.unfilled_variables + + self.template_computer_configure.unfilled_variables + + self.template_code.unfilled_variables + ): + var_warn_message = ", ".join([f"{v}" for v in unfilled_variables]) + self.message = wrap_message( + f"Please fill the template variables: {var_warn_message}", + MessageLevel.WARNING, + ) + return + + # Setup the computer and code. + if self.aiida_computer_setup.on_setup_computer(): + self.aiida_code_setup.on_setup_code() + + # Prepare the ssh key pair and copy to remote computer. + # This only happend when the ssh_auth is password. + if self.ssh_auth == "password": + try: + self.ssh_computer_setup.key_pair_prepare() + except ValueError as exc: + self.message = wrap_message( + f"Key pair generation failed: {exc}", + level=MessageLevel.ERROR, + ) + + self.ssh_computer_setup.thread_ssh_copy_id() + + # For not password ssh auth, key pair is not needed. + # There are other mechanism to set up the ssh connection. + # But we still need to write the ssh config to the ssh config file for such as + # proxy jump. + if not self.ssh_computer_setup._is_in_config(): + self.ssh_computer_setup._write_ssh_config() + + def _on_setup_computer_success(self): + """Callback that is called when the computer is successfully set up.""" + # update the computer dropdown list of code setup + self.aiida_code_setup.refresh() + + # select the computer in the dropdown list for code setup + label = self.aiida_computer_setup.label.value + self.aiida_code_setup.computer.select_by_label(label) + + def _on_setup_code_success(self): + """Callback that is called when the code is successfully set up.""" + self.success = True + + def reset(self): + """Reset the widget.""" + # reset the database + self.comp_resources_database.reset() + + # reset template variables + self.template_computer_setup.reset() + self.template_computer_configure.reset() + self.template_code.reset() + + # reset sub widgets + self.aiida_code_setup._reset() + self.aiida_computer_setup._reset() + + self.ssh_computer_setup._reset() + + # essential, since if not, the same computer_configure won't trigger the `_observe_ssh_config` callback. + self.ssh_computer_setup.ssh_config = {} + + # reset traits + self.computer_setup = {} + self.computer_configure = {} + self.code_setup = {} + + self.message = "" + self.success = False + self.ssh_auth = None + + # The layout also reset + self._update_layout() + + # untick the detailed switch checkbox + self.toggle_detail_setup.value = False + + +class ComputationalResourcesDatabaseWidget(ipw.VBox): + """Extract the setup of a known computer from the AiiDA code registry.""" + + _default_database_source = ( + "https://aiidateam.github.io/aiida-resource-registry/database.json" + ) + + database_source = tl.Unicode(allow_none=True) + + computer_setup = tl.Dict() + computer_configure = tl.Dict() + code_setup = tl.Dict() + + STYLE = {"description_width": "180px"} + LAYOUT = {"width": "400px"} + + def __init__( + self, + default_calc_job_plugin=None, + database_source=None, + show_reset_button=True, + **kwargs, + ): + if database_source is None: + database_source = self._default_database_source + + self.default_calc_job_plugin = default_calc_job_plugin + + # Select domain. + self.domain_selector = ipw.Dropdown( + options=(), + description="Domain:", + disabled=False, + layout=self.LAYOUT, + style=self.STYLE, + ) + self.domain_selector.observe(self._domain_changed, names=["value", "options"]) + + # Select computer. + self.computer_selector = ipw.Dropdown( + options=(), + description="Computer:", + disabled=False, + layout=self.LAYOUT, + style=self.STYLE, + ) + self.computer_selector.observe( + self._computer_changed, names=["value", "options"] + ) + + # Select code. + self.code_selector = ipw.Dropdown( + options=(), + description="Code:", + disabled=False, + layout=self.LAYOUT, + style=self.STYLE, + ) + self.code_selector.observe(self._code_changed, names=["value", "options"]) + + reset_button = ipw.Button(description="Reset") + reset_button.on_click(self.reset) + + if show_reset_button is False: + reset_button.layout.display = "none" + + super().__init__( + children=[ + self.domain_selector, + self.computer_selector, + self.code_selector, + reset_button, + ], + **kwargs, + ) + self.database_source = database_source + self.reset() + + def reset(self, _=None): + """Reset widget and traits""" + with self.hold_trait_notifications(): + self.domain_selector.value = None + self.computer_selector.value = None + self.code_selector.value = None + + @tl.observe("database_source") + def _database_source_changed(self, _=None): + self.database = self._database_generator( + self.database_source, self.default_calc_job_plugin + ) + + # Update domain selector. + self.domain_selector.options = self.database.keys() + self.reset() + + @staticmethod + def _database_generator(database_source, default_calc_job_plugin): + """From database source JSON and default calc job plugin, generate resource database""" + try: + database = requests.get(database_source).json() + except Exception: + database = {} + + if default_calc_job_plugin is None: + return database + + # filter database by default calc job plugin + for domain, domain_value in database.copy().items(): + for computer, computer_value in domain_value.copy().items(): + if computer == "default": + # skip default computer + continue + + for code, code_value in list(computer_value["codes"].items()): + # check if code plugin matches default plugin + # if not, remove code + plugin = re.sub( + TEMPLATE_PATTERN, r".*", code_value["default_calc_job_plugin"] + ) + if re.match(plugin, default_calc_job_plugin) is None: + del computer_value["codes"][code] + + if len(computer_value["codes"]) == 0: + # remove computer since no codes defined in this computer source + del domain_value[computer] + if domain_value.get("default") == computer: + # also remove default computer from domain + del domain_value["default"] + + if len(domain_value) == 0: + # remove domain since no computers with required codes defined in this domain source + del database[domain] + continue + + if domain_value["default"] not in domain_value: + # make sure default computer is still points to existing computer + domain_value["default"] = sorted(domain_value.keys() - {"default"})[0] + + return database + + def _domain_changed(self, change=None): + """callback when new domain selected""" + with self.hold_trait_notifications(): # To prevent multiple calls to callbacks + if change["new"] is None: + self.computer_selector.options = () + self.computer_selector.value = None + self.code_selector.options = () + self.code_selector.value = None + return + else: + selected_domain = self.domain_selector.value + + with self.hold_trait_notifications(): + self.computer_selector.options = tuple( + key + for key in self.database.get(selected_domain, {}).keys() + if key != "default" + ) + self.computer_selector.value = self.database.get( + selected_domain, {} + ).get("default") + + def _computer_changed(self, change=None): + """callback when new computer selected""" + with self.hold_trait_notifications(): + if change["new"] is None: + self.code_selector.options = () + self.code_selector.value = None + self.computer_setup, self.computer_configure = {}, {} + return + else: + self.code_selector.value = None + selected_computer = self.computer_selector.value + + selected_domain = self.domain_selector.value + + computer_dict = self.database.get(selected_domain, {}).get( + selected_computer, {} + ) + + self.code_selector.options = list(computer_dict.get("codes", {}).keys()) + self.code_selector.value = None + + computer_setup = computer_dict.get("computer", {}).get("computer-setup", {}) + computer_configure = computer_dict.get("computer", {}).get( + "computer-configure", {} + ) + + if "hostname" in computer_setup: + computer_configure["hostname"] = computer_setup.get("hostname") + + self.computer_setup = computer_setup + self.computer_configure = computer_configure + + def _code_changed(self, change=None): + """Update code settings.""" + if change["new"] is None: + self.code_setup = {} + return + else: + selected_code = self.code_selector.value + + selected_domain = self.domain_selector.value + selected_computer = self.computer_selector.value + + self.code_setup = ( + self.database.get(selected_domain, {}) + .get(selected_computer, {}) + .get("codes", {}) + .get(selected_code, {}) + ) diff --git a/src/aiidalab_sssp/pages/__init__.py b/src/aiidalab_sssp/pages/__init__.py index dfdd6e9..7aef79e 100644 --- a/src/aiidalab_sssp/pages/__init__.py +++ b/src/aiidalab_sssp/pages/__init__.py @@ -1,8 +1,9 @@ import solara from solara.alias import rv -from aiidalab_sssp.pages import explore, howto +from aiidalab_sssp.pages import explore from aiidalab_sssp.data import articles, names +from aiidalab_sssp.components import banner @solara.component @@ -50,10 +51,10 @@ def Page(): with solara.VBox() as main: solara.Title("Standard Solid-State Pseudopotential (SSSP) » Home") with solara.ColumnsResponsive(12): - howto.Overview() + banner.Overview() with solara.ColumnsResponsive([6, 6]): explore.Overview() - verification.Overview() + verify.Overview() return main diff --git a/src/aiidalab_sssp/pages/howto/__init__.py b/src/aiidalab_sssp/pages/howto/__init__.py deleted file mode 100644 index 6dfd264..0000000 --- a/src/aiidalab_sssp/pages/howto/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -from typing import Optional - -import solara - -from ... import data -from ...components.article import Overview - - -# XXX: this page I may don't need. -@solara.component -def Page(name: Optional[str] = None, page: int = 0, page_size=100): - if name is None: - with solara.Column() as main: - solara.Title("Home » Howto") - Overview() - return main - if name not in data.articles: - return solara.Error(f"No such article: {name!r}") - article = data.articles[name] - with solara.ColumnsResponsive(12) as main: - solara.Title("Home » Article » " + article.title) - with solara.Link("/article"): - solara.Text("« Back to overview") - with solara.Card(): - pre = f"# {article.title}\n\n" - solara.Markdown(pre + article.markdown) - return main diff --git a/src/aiidalab_sssp/pages/verification/__init__.py b/src/aiidalab_sssp/pages/verification/__init__.py deleted file mode 100644 index 1dc3476..0000000 --- a/src/aiidalab_sssp/pages/verification/__init__.py +++ /dev/null @@ -1,79 +0,0 @@ -"""This Page takes an extra argument, meaning that it can cache urls like /tabular/titanic -and pass the last part of the url as argument to the Page component, so we can render content -dynamically. -""" -from typing import Optional - -import solara -import textwrap -from solara.components.file_drop import FileInfo - - - -@solara.component -def Page(name: Optional[str] = None, page: int = 0, page_size=100): - with solara.ColumnsResponsive(12) as main: - solara.HTML(unsafe_innerHTML="""

More detailed verification instructions

-

-Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. -""") - with solara.ColumnsResponsive([6, 6]): - with solara.Card(title="UPF file upload"): - UpfFileDrop() - with solara.Card(title="Verification setup"): - DetailSetup() - - return main - -@solara.component -def DetailSetup(): - with solara.Column() as main: - # buttons for tuning protocol, dry-run (inspect the PP) or real-run of verification - # ComputationalResourcesWidget from AWB (can I??) - solara.HTML(unsafe_innerHTML="""PLACEHOLDER""") - UpfFileDrop() - return main - - -@solara.component -def UpfFileDrop(): - content, set_content = solara.use_state(b"") - filename, set_filename = solara.use_state("") - # size, set_size = solara.use_state(0) - - # TODO: should read as a stream. Should stop and raise when the file is too large. - - def on_file(f: FileInfo): - set_filename(f["name"]) - set_size(f["size"]) - set_content(f["file_obj"].read()) - - solara.FileDrop( - label="Drag and drop a UPF file here to run verification.", - on_file=on_file, - lazy=True, # XXX: check the detail, UPF file can be big ~1MB so lazy is the better choice? - ) - - # TODO: check it is a valide UPF file, by checking extension is not enough, should be a txt not a binary - # Should be able to be parsed. - # Raise on error if it is not valid - - # TODO: warning if it is not a PBE or relativistic PP. - - if content: - solara.Info(f"UPF {filename} is uploaded.") - solara.Preformatted("\n".join(textwrap.wrap(repr(content)))) - -html_content = """ -Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. -""" - -@solara.component -def Overview(): - with solara.Card(title="Verification") as main: - with solara.Column(): - solara.HTML(unsafe_innerHTML=html_content) - with solara.Card(title="UPF file upload"): - UpfFileDrop() - - return main diff --git a/src/aiidalab_sssp/pages/verify/__init__.py b/src/aiidalab_sssp/pages/verify/__init__.py new file mode 100644 index 0000000..3352a80 --- /dev/null +++ b/src/aiidalab_sssp/pages/verify/__init__.py @@ -0,0 +1,136 @@ +"""This Page takes an extra argument, meaning that it can cache urls like /tabular/titanic +and pass the last part of the url as argument to the Page component, so we can render content +dynamically. +""" +from IPython.display import display +from typing import Optional + +import solara +import textwrap +from solara.components.file_drop import FileInfo +from aiidalab_sssp.components.computational_resource import ComputationalResourcesWidget +import pandas as pd +import plotly + +df = plotly.data.iris() + +@solara.component +def Page(): + # XXX: used for showing all verifications + # Must have: + # 1. tickbox to show running and finished. + # 2. filter and search by name + # 3. redirect result to explore + # 4. Back to home to submit new. + solara.DataFrame( + df, + items_per_page=20, + ) + +@solara.component +def Overview(): + # XXX: consider not have a independent page but just a overview widget tool in the index page + protocol = solara.reactive("quick") + is_run_transferability = solara.reactive(True) + is_run_convergence = solara.reactive(True) + + mode = 'mc' + if mode == 'aiidalab': + import aiida + aiida.load_profile() + + crw = ComputationalResourcesWidget(enable_detailed_setup=False) + crw_value = solara.use_reactive(None) + + def _update_code_label(change): + crw_value.set(change['new']) + + crw.observe(_update_code_label) + + with solara.ColumnsResponsive(12) as main: + with solara.Card(title="Verification setup"): + solara.Markdown("Running verification, if .. if.. qucik for , standard for, trannnn") + + # Step 1 + UpfFileDrop() + + # Step 2 + with solara.VBox(): + solara.Markdown("### Protocol selection") + with solara.ToggleButtonsSingle(value=protocol): + solara.Button("Quick", icon_name="mdi-run-fast", value="quick", text=True) + solara.Button("Standard", icon_name="mdi-standard-definition", value="standard", text=True) + + solara.Markdown("### Verification selection") + with solara.Row(): + solara.Checkbox(label="Transferability", value=is_run_transferability) + solara.Checkbox(label="Convergence test", value=is_run_convergence) + + # XXX: Improve me, very hacky. + # The problem is when crw_value updated, this part of component will be rerendered. + if mode == 'aiidalab': + if crw_value.value is not None: + crw.value = crw_value.value + display(crw) + else: + display(crw) + + solara.Markdown(f"**Code**: {crw_value.value}") + else: + pw_code = 'pw-7.2@eiger-hq' + ph_code = 'ph-7.2@eiger-hq' + + # Step 3 + # Dry run will parse the UPF and display the basic info + # Lanuch will submit the verification + with solara.Row(): + solara.Button(label="Dry run", color="blue", icon_name="mdi-hair-dryer", outlined=True, text=True) + solara.Button(label="Launch", color="green", icon_name="mdi-play", outlined=True, text=True) + + + return main + + + +@solara.component +def UpfFileDrop(): + content, set_content = solara.use_state(b"") + filename, set_filename = solara.use_state("") + # size, set_size = solara.use_state(0) + + # TODO: should read as a stream. Should stop and raise when the file is too large. + + def on_file(f: FileInfo): + set_filename(f["name"]) + set_size(f["size"]) + set_content(f["file_obj"].read()) + + solara.FileDrop( + label="Drag and drop a UPF file here to run verification.", + on_file=on_file, + lazy=True, # XXX: check the detail, UPF file can be big ~1MB so lazy is the better choice? + ) + + # TODO: check it is a valide UPF file, by checking extension is not enough, should be a txt not a binary + # Should be able to be parsed. + # Raise on error if it is not valid + + # TODO: warning if it is not a PBE or relativistic PP. + + if content: + solara.Info(f"UPF {filename} is uploaded.") + solara.Preformatted("\n".join(textwrap.wrap(repr(content)))) + +html_content = """ +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. +""" + +# @solara.component +# def Overview(): +# with solara.Card(title="Verification") as main: +# with solara.Column(): +# solara.HTML(unsafe_innerHTML=html_content) +# with solara.Card(title="UPF file upload"): +# UpfFileDrop() +# +# return main