Skip to content

Commit

Permalink
Replace address with head_ip
Browse files Browse the repository at this point in the history
  • Loading branch information
carolineechen committed Nov 13, 2024
1 parent d0322c0 commit 9ad94cd
Show file tree
Hide file tree
Showing 17 changed files with 62 additions and 63 deletions.
2 changes: 1 addition & 1 deletion docs/tutorials/api-clusters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ and access objects and functions via ``curl``.
.. code:: ipython3
tls_cluster.address
tls_cluster.head_ip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def restart_container(self):
}

response = requests.post(
f"http://{cluster.address}:{port}/generate", headers=headers, json=data
f"http://{cluster.head_ip}:{port}/generate", headers=headers, json=data
)
print(response.json())

Expand All @@ -283,7 +283,7 @@ def restart_container(self):
# Alternatively, we can also call the model via HTTP
# Note: We can also use a streaming route by replacing `generate` with `generate_stream`:
print(
f"curl http://{cluster.address}:{port}/generate -X POST -d '"
f"curl http://{cluster.head_ip}:{port}/generate -X POST -d '"
f'{{"inputs":"{prompt_message}","parameters":{{"max_new_tokens":20}}}}'
"' -H 'Content-Type: application/json'"
)
4 changes: 2 additions & 2 deletions examples/llama2-with-tgi-ec2/tgi_llama_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def restart_container(self):
}

response = requests.post(
f"http://{cluster.address}:{port}/generate", headers=headers, json=data
f"http://{cluster.head_ip}:{port}/generate", headers=headers, json=data
)
print(response.json())

Expand All @@ -254,7 +254,7 @@ def restart_container(self):
# Alternatively, we can also call the model via HTTP
# Note: We can also use a streaming route by replacing `generate` with `generate_stream`:
print(
f"curl http://{cluster.address}:{port}/generate -X POST -d '"
f"curl http://{cluster.head_ip}:{port}/generate -X POST -d '"
f'{{"inputs":"{prompt_message}","parameters":{{"max_new_tokens":20}}}}'
"' -H 'Content-Type: application/json'"
)
4 changes: 2 additions & 2 deletions examples/llama3-8b-tgi-ec2/llama3_tgi_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def deploy(self):
}

response = requests.post(
f"http://{cluster.address}:{port}/generate",
f"http://{cluster.head_ip}:{port}/generate",
headers=headers,
json=data,
verify=False,
Expand All @@ -240,7 +240,7 @@ def deploy(self):
# Alternatively, we can also call the model via HTTP
# Note: We can also use a streaming route by replacing `generate` with `generate_stream`:
print(
f"curl http://{cluster.address}:{port}/generate -X POST -d '"
f"curl http://{cluster.head_ip}:{port}/generate -X POST -d '"
f'{{"inputs":"{prompt_message}","parameters":{{"max_new_tokens":20}}}}'
"' -H 'Content-Type: application/json'"
)
4 changes: 2 additions & 2 deletions examples/mistral-with-tgi-ec2/tgi_mistral_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def restart_container(self):
# on the Messages API, and using the OpenAI python client

# Initialize the OpenAI client, with the URL set to the cluster's address:
base_url = f"http://{cluster.address}:{port}/v1"
base_url = f"http://{cluster.head_ip}:{port}/v1"
client = OpenAI(base_url=base_url, api_key="-")

# Call the model with the prompt messages:
Expand All @@ -224,7 +224,7 @@ def restart_container(self):

# Alternatively, we can also call the model via HTTP:
print(
f"curl http://{cluster.address}:{port}/v1/chat/completions -X POST -d '"
f"curl http://{cluster.head_ip}:{port}/v1/chat/completions -X POST -d '"
'{"model": "tgi", "stream": false, "messages": [{"role": "system", "content": "You are a helpful assistant."},'
'{"role": "user", "content": "What is deep learning?"}]}'
"' -H 'Content-Type: application/json'"
Expand Down
6 changes: 3 additions & 3 deletions runhouse/resources/folders/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def mkdir(self):

def _to_cluster(self, dest_cluster, path=None):
"""Copy the folder from a file or cluster source onto a destination cluster."""
if not dest_cluster.address:
if not dest_cluster.ips:
raise ValueError("Cluster must be started before copying data to it.")

dest_path = path or f"~/{Path(self.path).name}"
Expand Down Expand Up @@ -382,7 +382,7 @@ def _cluster_to_cluster(self, dest_cluster, dest_path):
f"rsync -Pavz --filter='dir-merge,- .gitignore' -e \"ssh {creds_cmd}"
f"-o StrictHostKeyChecking=no -o IdentitiesOnly=yes -o ExitOnForwardFailure=yes "
f"-o ServerAliveInterval=5 -o ServerAliveCountMax=3 -o ConnectTimeout=30s -o ForwardAgent=yes "
f'-o ControlMaster=auto -o ControlPersist=300s" {src_path}/ {dest_cluster.address}:{dest_path}'
f'-o ControlMaster=auto -o ControlPersist=300s" {src_path}/ {dest_cluster.head_ip}:{dest_path}'
)
status_codes = self.system.run([command])
if status_codes[0][0] != 0:
Expand All @@ -400,7 +400,7 @@ def _cluster_to_local(self, cluster, dest_path):
This function rsyncs down the data and return a folder with system=='file'.
"""
if not cluster.address:
if not cluster.ips:
raise ValueError("Cluster must be started before copying data from it.")
Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)
cluster.rsync(
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/folders/gcs_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _gcs_copy_to_local(self, dest_path: str):
blob.download_to_filename(str(dest_file_path))

def _cluster_to_local(self, cluster, dest_path):
if not cluster.address:
if not cluster.ips:
raise ValueError("Cluster must be started before copying data from it.")
Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)

Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/folders/s3_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _s3_copy_to_local(self, dest_path: str):
self.client.download_file(bucket_name, obj_key, str(dest_file_path))

def _cluster_to_local(self, cluster, dest_path):
if not cluster.address:
if not cluster.ips:
raise ValueError("Cluster must be started before copying data from it.")

Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)
Expand Down
49 changes: 24 additions & 25 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,12 @@ def __init__(
self._setup_creds(creds)

@property
def address(self):
def head_ip(self):
return self.ips[0] if isinstance(self.ips, List) else None

@address.setter
def address(self, addr):
self.ips = self.ips or [None]
self.ips[0] = addr
@property
def address(self):
return self.head_ip

@property
def client(self):
Expand All @@ -156,7 +155,7 @@ def check_connect_server():
self._update_from_sky_status(dryrun=False)
if not self._ping(retry=False):
raise ConnectionError(
f"Could not reach {self.name} {self.ips}. Is cluster up?"
f"Could not reach {self.name} {self.head_ip}. Is cluster up?"
)
if not check_connect_server():
raise ConnectionError(
Expand Down Expand Up @@ -480,7 +479,7 @@ def endpoint(self, external: bool = False):
(including the local connected port rather than the sever port). If cluster is not up, returns
`None``. (Default: ``False``)
"""
if not self.address or self.on_this_cluster():
if not self.head_ip or self.on_this_cluster():
return None

client_port = self.client_port or self.server_port
Expand Down Expand Up @@ -529,7 +528,7 @@ def server_address(self):
if self.server_host in [LOCALHOST, "localhost"]:
return LOCALHOST

return self.address
return self.head_ip

@property
def is_shared(self) -> bool:
Expand Down Expand Up @@ -563,7 +562,7 @@ def _command_runner(
"CommandRunner can only be instantiated for individual nodes"
)

node = node or self.address
node = node or self.head_ip

if (
hasattr(self, "launched_properties")
Expand Down Expand Up @@ -708,8 +707,8 @@ def _sync_runhouse_to_cluster(
if self.on_this_cluster():
return

if not self.address:
raise ValueError(f"No address set for cluster <{self.name}>. Is it up?")
if not self.ips:
raise ValueError(f"No IPs set for cluster <{self.name}>. Is it up?")

env = env or self.default_env

Expand Down Expand Up @@ -955,8 +954,8 @@ def connect_tunnel(self, force_reconnect=False):
)

def connect_server_client(self, force_reconnect=False):
if not self.address:
raise ValueError(f"No address set for cluster <{self.name}>. Is it up?")
if not self.ips:
raise ValueError(f"No IPs set for cluster <{self.name}>. Is it up?")

if self.server_connection_type == ServerConnectionType.SSH:
# For a password cluster, the 'ssh_tunnel' command assumes a Control Master is already set up with
Expand Down Expand Up @@ -1057,7 +1056,7 @@ def ssh_tunnel(
)

return ssh_tunnel(
address=self.address,
address=self.head_ip,
ssh_creds=self.creds_values,
docker_user=self.docker_user,
local_port=local_port,
Expand Down Expand Up @@ -1092,31 +1091,31 @@ def _use_custom_certs(self):

def _start_ray_workers(self, ray_port, env):
for host in self.ips:
if host == self.address:
if host == self.head_ip:
# This is the master node, skip
continue
logger.info(
f"Starting Ray on worker {host} with head node at {self.address}:{ray_port}."
f"Starting Ray on worker {host} with head node at {self.head_ip}:{ray_port}."
)
self.run(
commands=[
f"ray start --address={self.address}:{ray_port} --disable-usage-stats",
f"ray start --address={self.head_ip}:{ray_port} --disable-usage-stats",
],
node=host,
env=env,
)

def _run_cli_commands_on_cluster_helper(self, commands: List[str]):
if self.on_this_cluster():
return self.run(commands=commands, env=self._default_env, node=self.address)
return self.run(commands=commands, env=self._default_env, node=self.head_ip)
else:
if self._default_env:
commands = [self._default_env._full_command(cmd) for cmd in commands]
return self._run_commands_with_runner(
commands=commands,
cmd_prefix="",
env_vars=self._default_env.env_vars if self._default_env else {},
node=self.address,
node=self.head_ip,
require_outputs=False,
)

Expand Down Expand Up @@ -1190,7 +1189,7 @@ def _start_or_restart_helper(
# Rebuild on restart to ensure the correct subject name is included in the cert SAN
# Cert subject name needs to match the target (IP address or domain)
self.cert_config.generate_certs(
address=self.address, domain=self.domain
address=self.head_ip, domain=self.domain
)
self._copy_certs_to_cluster()

Expand Down Expand Up @@ -1503,14 +1502,14 @@ def rsync(
from runhouse.resources.hardware.sky_command_runner import SshMode

# If no address provided explicitly use the head node address
node = node or self.address
node = node or self.head_ip
# FYI, could be useful: https://github.com/gchamon/sysrsync
if contents:
source = source + "/" if not source.endswith("/") else source
dest = dest + "/" if not dest.endswith("/") else dest

# If we're already on this cluster (and node, if multinode), this is just a local rsync
if self.on_this_cluster() and node == self.address:
if self.on_this_cluster() and node == self.head_ip:
if Path(source).expanduser().resolve() == Path(dest).expanduser().resolve():
return

Expand Down Expand Up @@ -1590,15 +1589,15 @@ def ssh(self):
"""
creds = self.creds_values
_run_ssh_command(
address=self.address,
address=self.head_ip,
ssh_user=creds["ssh_user"],
ssh_port=self.ssh_port,
ssh_private_key=creds["ssh_private_key"],
docker_user=self.docker_user,
)

def _ping(self, timeout=5, retry=False):
if not self.address:
if not self.ips:
return False

def run_ssh_call():
Expand Down Expand Up @@ -1755,7 +1754,7 @@ def _run_commands_with_runner(
commands = [commands]

# If no address provided explicitly use the head node address
node = node or self.address
node = node or self.head_ip

return_codes = []

Expand Down
20 changes: 10 additions & 10 deletions runhouse/resources/hardware/on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ def client(self):
try:
return super().client
except ValueError as e:
if not self.address:
if not self.ips:
# Try loading in from local Sky DB
self._update_from_sky_status(dryrun=True)
if not self.address:
if not self.ips:
raise ValueError(
f"Could not determine address for ondemand cluster <{self.name}>. "
f"Could not determine ips for ondemand cluster <{self.name}>. "
"Up the cluster with `cluster.up_if_not`."
)
return super().client
Expand Down Expand Up @@ -197,7 +197,7 @@ def config(self, condensed=True):
return config

def endpoint(self, external: bool = False):
if not self.address or self.on_this_cluster():
if not self.ips or self.on_this_cluster():
return None

try:
Expand All @@ -215,7 +215,7 @@ def _copy_sky_yaml_from_cluster(self, abs_yaml_path: str):
# Save SSH info to the ~/.ssh/config
ray_yaml = yaml.safe_load(open(abs_yaml_path, "r"))
backend_utils.SSHConfigHelper.add_cluster(
self.name, [self.address], ray_yaml["auth"]
self.name, [self.head_up], ray_yaml["auth"]
)

@staticmethod
Expand Down Expand Up @@ -361,7 +361,7 @@ def _start_ray_workers(self, ray_port, env):
"handle"
].stable_internal_external_ips
for internal, external in stable_internal_external_ips:
if external == self.address:
if external == self.head_ip:
internal_head_ip = internal
else:
# NOTE: Using external worker address here because we're running from local
Expand All @@ -385,9 +385,9 @@ def _start_ray_workers(self, ray_port, env):
def _populate_connection_from_status_dict(self, cluster_dict: Dict[str, Any]):
if cluster_dict and cluster_dict["status"].name in ["UP", "INIT"]:
handle = cluster_dict["handle"]
self.address = handle.head_ip
head_ip = handle.head_ip
self.stable_internal_external_ips = handle.stable_internal_external_ips
if self.stable_internal_external_ips is None or self.address is None:
if self.stable_internal_external_ips is None or head_ip is None:
raise ValueError(
"Sky's cluster status does not have the necessary information to connect to the cluster. Please check if the cluster is up via `sky status`. Consider bringing down the cluster with `sky down` if you are still having issues."
)
Expand Down Expand Up @@ -577,7 +577,7 @@ def up(self):
logger.info(
f"Cluster has been launched with the custom domain '{self.domain}'. "
"Please add an A record to your DNS provider to point this domain to the cluster's "
f"public IP address ({self.address}) to ensure successful requests."
f"public IP address ({self.head_ip}) to ensure successful requests."
)

self.restart_server()
Expand Down Expand Up @@ -625,7 +625,7 @@ def teardown(self):

# Stream logs
sky.down(self.name)
self.address = None
self.ips = None

def teardown_and_delete(self):
"""Teardown cluster and delete it from configs.
Expand Down
4 changes: 2 additions & 2 deletions runhouse/resources/hardware/sky_command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
def get_docker_user(cluster: "Cluster", ssh_creds: Dict) -> str:
"""Find docker container username."""
runner = SkySSHRunner(
node=(cluster.address, cluster.ssh_port),
node=(cluster.head_ip, cluster.ssh_port),
ssh_user=ssh_creds.get("ssh_user", None),
ssh_private_key=ssh_creds.get("ssh_private_key", None),
ssh_control_name=ssh_creds.get(
"ssh_control_name", f"{cluster.address}:{cluster.ssh_port}"
"ssh_control_name", f"{cluster.head_ip}:{cluster.ssh_port}"
),
)
container_name = DEFAULT_DOCKER_CONTAINER_NAME
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/static_cluster_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def static_cpu_pwd_cluster():
"password": "cluster-pass",
}
args = dict(
name="static-cpu-password", host=[sky_cluster.address], ssh_creds=ssh_creds
name="static-cpu-password", host=[sky_cluster.head_ip], ssh_creds=ssh_creds
)
c = rh.cluster(**args).save()
c.restart_server(resync_rh=True)
Expand Down
Loading

0 comments on commit 9ad94cd

Please sign in to comment.