Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix streaming with HTTP/HTTPS/Nginx #261

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _start_server(
den_auth=False,
ssl_keyfile=None,
ssl_certfile=None,
force_reinstall=False,
restart_proxy=False,
use_nginx=False,
certs_address=None,
use_local_telemetry=False,
Expand All @@ -125,7 +125,7 @@ def _start_server(
den_auth=den_auth,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
force_reinstall=force_reinstall,
restart_proxy=restart_proxy,
use_nginx=use_nginx,
certs_address=certs_address,
use_local_telemetry=use_local_telemetry,
Expand Down Expand Up @@ -248,7 +248,7 @@ def restart(
ssl_certfile: Optional[str] = typer.Option(
None, help="Path to custom SSL cert file to use for enabling HTTPS"
),
force_reinstall: bool = typer.Option(
restart_proxy: bool = typer.Option(
False, help="Whether to reinstall Nginx and other server configs on the cluster"
),
use_nginx: bool = typer.Option(
Expand Down Expand Up @@ -282,7 +282,7 @@ def restart(
den_auth=use_den_auth,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
force_reinstall=force_reinstall,
restart_proxy=restart_proxy,
use_nginx=use_nginx,
certs_address=certs_address,
use_local_telemetry=use_local_telemetry,
Expand Down
8 changes: 4 additions & 4 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ def _start_server_cmds(
den_auth,
ssl_keyfile,
ssl_certfile,
force_reinstall,
restart_proxy,
use_nginx,
certs_address,
use_local_telemetry,
Expand All @@ -674,10 +674,10 @@ def _start_server_cmds(
logger.info("Starting server with Den auth.")
flags.append(den_auth_flag)

force_reinstall_flag = " --force-reinstall" if force_reinstall else ""
if force_reinstall_flag:
restart_proxy_flag = " --restart-proxy" if restart_proxy else ""
if restart_proxy_flag:
logger.info("Reinstalling Nginx and server configs.")
flags.append(force_reinstall_flag)
flags.append(restart_proxy_flag)

use_nginx_flag = " --use-nginx" if use_nginx else ""
if use_nginx_flag:
Expand Down
11 changes: 10 additions & 1 deletion runhouse/servers/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,16 @@ def call_module_method( # TODO rename call_module_method to call
# maybe a stream of results), so we need to separate these out.
non_generator_result = None
res_iter = res.iter_lines(chunk_size=None)
for responses_json in res_iter:
# We need to manually iterate through res_iter so we can try/except to bypass a ChunkedEncodingError bug
while True:
try:
responses_json = next(res_iter)
except requests.exceptions.ChunkedEncodingError:
# Some silly bug in urllib3, see https://github.com/psf/requests/issues/4248
continue
except StopIteration:
break

resp = json.loads(responses_json)
output_type = resp["output_type"]
result = handle_response(resp, output_type, error_str)
Expand Down
27 changes: 18 additions & 9 deletions runhouse/servers/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ def _get_results_and_logs_generator(
raise ray.exceptions.GetTimeoutError
if not ret_val.output_type == OutputType.RESULT_STREAM:
waiting_for_results = False
ret_val = ret_val.data if serialization == "json" else ret_val
ret_resp = json.dumps(jsonable_encoder(ret_val))
yield ret_resp + "\n"
except ray.exceptions.GetTimeoutError:
Expand All @@ -500,9 +501,13 @@ def _get_results_and_logs_generator(
# ret_lines.append(f"Process {i}:")
ret_lines += file_lines
if ret_lines:
lines_resp = Response(
data=ret_lines,
output_type=OutputType.STDOUT,
lines_resp = (
Response(
data=ret_lines,
output_type=OutputType.STDOUT,
)
if not serialization == "json"
else ret_lines
)
logger.debug(f"Yielding logs for key {key}")
yield json.dumps(jsonable_encoder(lines_resp)) + "\n"
Expand Down Expand Up @@ -622,11 +627,13 @@ def get_call(request: Request, module, method=None, serialization="json"):
return Response(output_type=OutputType.NOT_FOUND, data=message.key)

if message.run_async:
return Response(
data=pickle_b64(message.key)
return (
Response(
data=pickle_b64(message.key),
output_type=OutputType.RESULT,
)
if not serialization == "json"
else message.key,
output_type=OutputType.RESULT,
else message.key
)

return StreamingResponse(
Expand All @@ -644,8 +651,10 @@ def get_call(request: Request, module, method=None, serialization="json"):
logger.exception(e)
HTTPServer.register_activity()
return Response(
error=pickle_b64(e),
traceback=pickle_b64(traceback.format_exc()),
error=pickle_b64(e) if not serialization == "json" else str(e),
traceback=pickle_b64(traceback.format_exc())
if not serialization == "json"
else str(traceback.format_exc()),
output_type=OutputType.EXCEPTION,
)

Expand Down
8 changes: 4 additions & 4 deletions runhouse/servers/http/http_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import re
import sys
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional

from pydantic import BaseModel
from ray import cloudpickle as pickle
Expand All @@ -27,9 +27,9 @@ class Args(BaseModel):


class Response(BaseModel):
data: Union[None, str, List[str], Dict]
error: Optional[str]
traceback: Optional[str]
data: Any = None
error: Optional[str] = None
traceback: Optional[str] = None
output_type: str


Expand Down
8 changes: 2 additions & 6 deletions runhouse/servers/nginx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,8 @@ def _http_template(self):
server_name {server_name};

location / {{
proxy_buffering off;
proxy_pass {proxy_pass};
proxy_buffer_size 128k;
proxy_buffers 4 256k;
proxy_busy_buffers_size 256k;
}}
}}
"""
Expand All @@ -134,10 +132,8 @@ def _https_template(self):
ssl_certificate_key {ssl_key_path};

location / {{
proxy_buffering off;
proxy_pass {proxy_pass};
proxy_buffer_size 128k;
proxy_buffers 4 256k;
proxy_busy_buffers_size 256k;
}}
}}
"""
Expand Down
8 changes: 2 additions & 6 deletions tests/test_servers/test_nginx.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,8 @@ def test_nginx_http_config_generation(
server_name {config.address};

location / {{
proxy_buffering off;
proxy_pass http://127.0.0.1:{config.rh_server_port}/;
proxy_buffer_size 128k;
proxy_buffers 4 256k;
proxy_busy_buffers_size 256k;
}}
}}
"""
Expand Down Expand Up @@ -401,10 +399,8 @@ def test_nginx_https_config_generation(
ssl_certificate_key {config.ssl_key_path};

location / {{
proxy_buffering off;
proxy_pass http://127.0.0.1:{config.rh_server_port}/;
proxy_buffer_size 128k;
proxy_buffers 4 256k;
proxy_busy_buffers_size 256k;
}}
}}
"""
Expand Down
Loading